00001
00002
00003
00004
00005
00006
00007
00008
00009
00032 #include <string.h>
00033 #include <stdarg.h>
00034 #include "PCU.h"
00035 #include "pcu_msg.h"
00036 #include "pcu_pmpi.h"
00037 #include "pcu_order.h"
00038 #include "noto_malloc.h"
00039 #include "reel.h"
00040
00041 enum state { uninit, init };
00042 static enum state global_state = uninit;
00043 static pcu_msg global_pmsg;
00044
00045 static pcu_msg* get_msg()
00046 {
00047 return &global_pmsg;
00048 }
00049
00055 int PCU_Comm_Init(void)
00056 {
00057 if (global_state != uninit)
00058 reel_fail("nested calls to Comm_Init");
00059 pcu_pmpi_init(MPI_COMM_WORLD);
00060 pcu_set_mpi(&pcu_pmpi);
00061 pcu_make_msg(&global_pmsg);
00062 global_state = init;
00063
00064
00065
00066 PCU_Comm_Order(true);
00067 return PCU_SUCCESS;
00068 }
00069
00074 int PCU_Comm_Free(void)
00075 {
00076 if (global_state == uninit)
00077 reel_fail("Comm_Free called before Comm_Init");
00078 if (global_pmsg.order)
00079 pcu_order_free(global_pmsg.order);
00080 pcu_free_msg(&global_pmsg);
00081 pcu_pmpi_finalize();
00082 global_state = uninit;
00083 return PCU_SUCCESS;
00084 }
00085
00095 int PCU_Comm_Self(void)
00096 {
00097 if (global_state == uninit)
00098 reel_fail("Comm_Self called before Comm_Init");
00099 return pcu_mpi_rank();
00100 }
00101
00106 int PCU_Comm_Peers(void)
00107 {
00108 if (global_state == uninit)
00109 reel_fail("Comm_Peers called before Comm_Init");
00110 return pcu_mpi_size();
00111 }
00112
00119 void PCU_Comm_Begin(void)
00120 {
00121 if (global_state == uninit)
00122 reel_fail("Comm_Begin called before Comm_Init");
00123 pcu_msg_start(get_msg());
00124 }
00125
00132 int PCU_Comm_Pack(int to_rank, const void* data, size_t size)
00133 {
00134 if (global_state == uninit)
00135 reel_fail("Comm_Pack called before Comm_Init");
00136 if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
00137 reel_fail("Invalid rank in Comm_Pack");
00138 memcpy(pcu_msg_pack(get_msg(),to_rank,size),data,size);
00139 return PCU_SUCCESS;
00140 }
00141
00149 int PCU_Comm_Send(void)
00150 {
00151 if (global_state == uninit)
00152 reel_fail("Comm_Send called before Comm_Init");
00153 pcu_msg_send(get_msg());
00154 return PCU_SUCCESS;
00155 }
00156
00168 bool PCU_Comm_Listen(void)
00169 {
00170 if (global_state == uninit)
00171 reel_fail("Comm_Listen called before Comm_Init");
00172 pcu_msg* m = get_msg();
00173 if (m->order)
00174 return pcu_order_receive(m->order, m);
00175 return pcu_msg_receive(m);
00176 }
00177
00181 int PCU_Comm_Sender(void)
00182 {
00183 if (global_state == uninit)
00184 reel_fail("Comm_Sender called before Comm_Init");
00185 pcu_msg* m = get_msg();
00186 if (m->order)
00187 return pcu_order_received_from(m->order);
00188 return pcu_msg_received_from(m);
00189 }
00190
00194 bool PCU_Comm_Unpacked(void)
00195 {
00196 if (global_state == uninit)
00197 reel_fail("Comm_Unpacked called before Comm_Init");
00198 pcu_msg* m = get_msg();
00199 if (m->order)
00200 return pcu_order_unpacked(m->order);
00201 return pcu_msg_unpacked(m);
00202 }
00203
00214 int PCU_Comm_Unpack(void* data, size_t size)
00215 {
00216 if (global_state == uninit)
00217 reel_fail("Comm_Unpack called before Comm_Init");
00218 pcu_msg* m = get_msg();
00219 if (m->order)
00220 memcpy(data,pcu_order_unpack(m->order,size),size);
00221 else
00222 memcpy(data,pcu_msg_unpack(m,size),size);
00223 return PCU_SUCCESS;
00224 }
00225
00226 void PCU_Comm_Order(bool on)
00227 {
00228 if (global_state == uninit)
00229 reel_fail("Comm_Order called before Comm_Init");
00230 pcu_msg* m = get_msg();
00231 if (on && (!m->order))
00232 m->order = pcu_order_new();
00233 if ((!on) && m->order) {
00234 pcu_order_free(m->order);
00235 m->order = NULL;
00236 }
00237 }
00238
00240 void PCU_Barrier(void)
00241 {
00242 if (global_state == uninit)
00243 reel_fail("Barrier called before Comm_Init");
00244 pcu_barrier(&(get_msg()->coll));
00245 }
00246
00253 void PCU_Add_Doubles(double* p, size_t n)
00254 {
00255 if (global_state == uninit)
00256 reel_fail("Add_Doubles called before Comm_Init");
00257 pcu_allreduce(&(get_msg()->coll),pcu_add_doubles,p,n*sizeof(double));
00258 }
00259
00260 double PCU_Add_Double(double x)
00261 {
00262 double a[1];
00263 a[0] = x;
00264 PCU_Add_Doubles(a, 1);
00265 return a[0];
00266 }
00267
00270 void PCU_Min_Doubles(double* p, size_t n)
00271 {
00272 if (global_state == uninit)
00273 reel_fail("Min_Doubles called before Comm_Init");
00274 pcu_allreduce(&(get_msg()->coll),pcu_min_doubles,p,n*sizeof(double));
00275 }
00276
00277 double PCU_Min_Double(double x)
00278 {
00279 double a[1];
00280 a[0] = x;
00281 PCU_Min_Doubles(a, 1);
00282 return a[0];
00283 }
00284
00287 void PCU_Max_Doubles(double* p, size_t n)
00288 {
00289 if (global_state == uninit)
00290 reel_fail("Max_Doubles called before Comm_Init");
00291 pcu_allreduce(&(get_msg()->coll),pcu_max_doubles,p,n*sizeof(double));
00292 }
00293
00294 double PCU_Max_Double(double x)
00295 {
00296 double a[1];
00297 a[0] = x;
00298 PCU_Max_Doubles(a, 1);
00299 return a[0];
00300 }
00301
00304 void PCU_Add_Ints(int* p, size_t n)
00305 {
00306 if (global_state == uninit)
00307 reel_fail("Add_Ints called before Comm_Init");
00308 pcu_allreduce(&(get_msg()->coll),pcu_add_ints,p,n*sizeof(int));
00309 }
00310
00311 int PCU_Add_Int(int x)
00312 {
00313 int a[1];
00314 a[0] = x;
00315 PCU_Add_Ints(a, 1);
00316 return a[0];
00317 }
00318
00321 void PCU_Add_Longs(long* p, size_t n)
00322 {
00323 if (global_state == uninit)
00324 reel_fail("Add_Longs called before Comm_Init");
00325 pcu_allreduce(&(get_msg()->coll),pcu_add_longs,p,n*sizeof(long));
00326 }
00327
00328 long PCU_Add_Long(long x)
00329 {
00330 long a[1];
00331 a[0] = x;
00332 PCU_Add_Longs(a, 1);
00333 return a[0];
00334 }
00335
00338 void PCU_Add_SizeTs(size_t* p, size_t n)
00339 {
00340 if (global_state == uninit)
00341 reel_fail("Add_SizeTs called before Comm_Init");
00342 pcu_allreduce(&(get_msg()->coll),pcu_add_sizets,p,n*sizeof(size_t));
00343 }
00344
00345 size_t PCU_Add_SizeT(size_t x)
00346 {
00347 size_t a[1];
00348 a[0] = x;
00349 PCU_Add_SizeTs(a, 1);
00350 return a[0];
00351 }
00352
00355 void PCU_Min_SizeTs(size_t* p, size_t n) {
00356 if (global_state == uninit)
00357 reel_fail("Min_SizeTs called before Comm_Init");
00358 pcu_allreduce(&(get_msg()->coll),pcu_min_sizets,p,n*sizeof(size_t));
00359 }
00360
00361 size_t PCU_Min_SizeT(size_t x) {
00362 size_t a[1];
00363 a[0] = x;
00364 PCU_Min_SizeTs(a, 1);
00365 return a[0];
00366 }
00367
00370 void PCU_Max_SizeTs(size_t* p, size_t n) {
00371 if (global_state == uninit)
00372 reel_fail("Max_SizeTs called before Comm_Init");
00373 pcu_allreduce(&(get_msg()->coll),pcu_max_sizets,p,n*sizeof(size_t));
00374 }
00375
00376 size_t PCU_Max_SizeT(size_t x) {
00377 size_t a[1];
00378 a[0] = x;
00379 PCU_Max_SizeTs(a, 1);
00380 return a[0];
00381 }
00382
00389 void PCU_Exscan_Ints(int* p, size_t n)
00390 {
00391 if (global_state == uninit)
00392 reel_fail("Exscan_Ints called before Comm_Init");
00393 int* originals;
00394 NOTO_MALLOC(originals,n);
00395 for (size_t i=0; i < n; ++i)
00396 originals[i] = p[i];
00397 pcu_scan(&(get_msg()->coll),pcu_add_ints,p,n*sizeof(int));
00398
00399 for (size_t i=0; i < n; ++i)
00400 p[i] -= originals[i];
00401 noto_free(originals);
00402 }
00403
00404 int PCU_Exscan_Int(int x)
00405 {
00406 int a[1];
00407 a[0] = x;
00408 PCU_Exscan_Ints(a, 1);
00409 return a[0];
00410 }
00411
00413 void PCU_Exscan_Longs(long* p, size_t n)
00414 {
00415 if (global_state == uninit)
00416 reel_fail("Exscan_Longs called before Comm_Init");
00417 long* originals;
00418 NOTO_MALLOC(originals,n);
00419 for (size_t i=0; i < n; ++i)
00420 originals[i] = p[i];
00421 pcu_scan(&(get_msg()->coll),pcu_add_longs,p,n*sizeof(long));
00422
00423 for (size_t i=0; i < n; ++i)
00424 p[i] -= originals[i];
00425 noto_free(originals);
00426 }
00427
00428 long PCU_Exscan_Long(long x)
00429 {
00430 long a[1];
00431 a[0] = x;
00432 PCU_Exscan_Longs(a, 1);
00433 return a[0];
00434 }
00435
00438 void PCU_Min_Ints(int* p, size_t n)
00439 {
00440 if (global_state == uninit)
00441 reel_fail("Min_Ints called before Comm_Init");
00442 pcu_allreduce(&(get_msg()->coll),pcu_min_ints,p,n*sizeof(int));
00443 }
00444
00445 int PCU_Min_Int(int x)
00446 {
00447 int a[1];
00448 a[0] = x;
00449 PCU_Min_Ints(a, 1);
00450 return a[0];
00451 }
00452
00455 void PCU_Max_Ints(int* p, size_t n)
00456 {
00457 if (global_state == uninit)
00458 reel_fail("Max_Ints called before Comm_Init");
00459 pcu_allreduce(&(get_msg()->coll),pcu_max_ints,p,n*sizeof(int));
00460 }
00461
00462 int PCU_Max_Int(int x)
00463 {
00464 int a[1];
00465 a[0] = x;
00466 PCU_Max_Ints(a, 1);
00467 return a[0];
00468 }
00469
00472 int PCU_Or(int c)
00473 {
00474 return PCU_Max_Int(c);
00475 }
00476
00479 int PCU_And(int c)
00480 {
00481 return PCU_Min_Int(c);
00482 }
00483
00486 int PCU_Proc_Self(void)
00487 {
00488 if (global_state == uninit)
00489 reel_fail("Proc_Self called before Comm_Init");
00490 return pcu_pmpi_rank();
00491 }
00492
00495 int PCU_Proc_Peers(void)
00496 {
00497 if (global_state == uninit)
00498 reel_fail("Proc_Peers called before Comm_Init");
00499 return pcu_pmpi_size();
00500 }
00501
00504 int PCU_Comm_Rank(int* rank)
00505 {
00506 if (global_state == uninit)
00507 reel_fail("Comm_Rank called before Comm_Init");
00508 *rank = pcu_mpi_rank();
00509 return PCU_SUCCESS;
00510 }
00511
00513 int PCU_Comm_Size(int* size)
00514 {
00515 if (global_state == uninit)
00516 reel_fail("Comm_Size called before Comm_Init");
00517 *size = pcu_mpi_size();
00518 return PCU_SUCCESS;
00519 }
00520
00522 bool PCU_Comm_Initialized(void)
00523 {
00524 return global_state == init;
00525 }
00526
00529 int PCU_Comm_Start(PCU_Method method)
00530 {
00531 (void)method;
00532 if (global_state == uninit)
00533 reel_fail("Comm_Start called before Comm_Init");
00534 pcu_msg_start(get_msg());
00535 return PCU_SUCCESS;
00536 }
00537
00543 int PCU_Comm_Packed(int to_rank, size_t* size)
00544 {
00545 if (global_state == uninit)
00546 reel_fail("Comm_Packed called before Comm_Init");
00547 if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
00548 reel_fail("Invalid rank in Comm_Packed");
00549 *size = pcu_msg_packed(get_msg(),to_rank);
00550 return PCU_SUCCESS;
00551 }
00552
00562 int PCU_Comm_Write(int to_rank, const void* data, size_t size)
00563 {
00564 if (global_state == uninit)
00565 reel_fail("Comm_Write called before Comm_Init");
00566 if ((to_rank < 0)||(to_rank >= pcu_mpi_size()))
00567 reel_fail("Invalid rank in Comm_Write");
00568 pcu_msg* msg = get_msg();
00569 PCU_MSG_PACK(msg,to_rank,size);
00570 memcpy(pcu_msg_pack(msg,to_rank,size),data,size);
00571 return PCU_SUCCESS;
00572 }
00573
00575 bool PCU_Comm_Receive(void)
00576 {
00577 while (PCU_Comm_Unpacked())
00578 if (!PCU_Comm_Listen())
00579 return false;
00580 return true;
00581 }
00582
00595 bool PCU_Comm_Read(int* from_rank, void** data, size_t* size)
00596 {
00597 if (global_state == uninit)
00598 reel_fail("Comm_Read called before Comm_Init");
00599 if (!PCU_Comm_Receive())
00600 return false;
00601 *from_rank = PCU_Comm_Sender();
00602 PCU_COMM_UNPACK(*size);
00603 *data = PCU_Comm_Extract(*size);
00604 return true;
00605 }
00606
00608 void PCU_Debug_Open(void)
00609 {
00610 if (global_state == uninit)
00611 reel_fail("Debug_Open called before Comm_Init");
00612 pcu_msg* msg = get_msg();
00613 if ( ! msg->file)
00614 msg->file = pcu_open_parallel("debug","txt");
00615 }
00616
00618 void PCU_Debug_Print(const char* format, ...)
00619 {
00620 if (global_state == uninit)
00621 reel_fail("Debug_Print called before Comm_Init");
00622 pcu_msg* msg = get_msg();
00623 if ( ! msg->file)
00624 return;
00625 va_list ap;
00626 va_start(ap,format);
00627 vfprintf(msg->file,format,ap);
00628 va_end(ap);
00629 fflush(msg->file);
00630 }
00631
00633 int PCU_Comm_From(int* from_rank)
00634 {
00635 if (global_state == uninit)
00636 reel_fail("Comm_From called before Comm_Init");
00637 pcu_msg* m = get_msg();
00638 if (m->order)
00639 *from_rank = pcu_order_received_from(m->order);
00640 else
00641 *from_rank = pcu_msg_received_from(m);
00642 return PCU_SUCCESS;
00643 }
00644
00650 int PCU_Comm_Received(size_t* size)
00651 {
00652 if (global_state == uninit)
00653 reel_fail("Comm_Received called before Comm_Init");
00654 pcu_msg* m = get_msg();
00655 if (m->order)
00656 *size = pcu_order_received_size(m->order);
00657 else
00658 *size = pcu_msg_received_size(m);
00659 return PCU_SUCCESS;
00660 }
00661
00668 void* PCU_Comm_Extract(size_t size)
00669 {
00670 if (global_state == uninit)
00671 reel_fail("Comm_Extract called before Comm_Init");
00672 pcu_msg* m = get_msg();
00673 if (m->order)
00674 return pcu_order_unpack(m->order,size);
00675 return pcu_msg_unpack(m,size);
00676 }
00677
00685 void PCU_Switch_Comm(MPI_Comm new_comm)
00686 {
00687 if (global_state == uninit)
00688 reel_fail("Switch_Comm called before Comm_Init");
00689 pcu_pmpi_switch(new_comm);
00690 }
00691
00697 MPI_Comm PCU_Get_Comm(void)
00698 {
00699 if (global_state == uninit)
00700 reel_fail("Get_Comm called before Comm_Init");
00701 return pcu_pmpi_comm();
00702 }
00703
00706 double PCU_Time(void)
00707 {
00708 return MPI_Wtime();
00709 }
00710
00711 void PCU_Protect(void)
00712 {
00713 reel_protect();
00714 }