00001 #ifndef PMST_CLUSTER_MAKER_CPP
00002 #define PMST_CLUSTER_MAKER_CPP
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "PMSTClusterMaker.h"
00038 #include "ESTAnalyzer.h"
00039 #include "MSTMultiListCache.h"
00040 #include "MSTHeapCache.h"
00041 #include "PMSTHeapCache.h"
00042 #include "EST.h"
00043 #include "MPIStats.h"
00044 #include "HeuristicChain.h"
00045 #include "Heuristic.h"
00046 #include "BipartiteData.h"
00047 #include <fstream>
00048 #include <sstream>
00049 #include <list>
00050 #include <cmath>
00051
00052
00053 #define MANAGER_RANK 0
00054
00055
00056 #define NO_ERROR 0
00057
00058
00059 char PDefCacheType[10] = "heap";
00060
00061
00062 int PMSTClusterMaker::cacheSize = 128;
00063 bool PMSTClusterMaker::noCacheRepop = true;
00064 bool PMSTClusterMaker::strictOrder = false;
00065 char* PMSTClusterMaker::inputMSTFile = NULL;
00066 char* PMSTClusterMaker::outputMSTFile = NULL;
00067 bool PMSTClusterMaker::dontCluster = false;
00068 bool PMSTClusterMaker::prettyPrint = false;
00069 double PMSTClusterMaker::percentile = 1.0;
00070 int PMSTClusterMaker::maxUse = 0;
00071 char* PMSTClusterMaker::cacheType = PDefCacheType;
00072
00073
00074 arg_parser::arg_record PMSTClusterMaker::argsList[] = {
00075 {"--cache", "#similarity metrics to cache per EST",
00076 &PMSTClusterMaker::cacheSize, arg_parser::INTEGER},
00077 {"--no-cache-repop", "Suppress EST cache repopulation",
00078 &PMSTClusterMaker::noCacheRepop, arg_parser::BOOLEAN},
00079 {"--percentile", "Percentile deviation to use to compute threshold value",
00080 &PMSTClusterMaker::percentile, arg_parser::DOUBLE},
00081 {"--no-order", "Disable strict order of processing messages",
00082 &PMSTClusterMaker::strictOrder, arg_parser::BOOLEAN},
00083 {"--input-mst-file", "Read MST data from file (skip parallel MST building)",
00084 &PMSTClusterMaker::inputMSTFile, arg_parser::STRING},
00085 {"--output-mst-file", "Output MST data to file",
00086 &PMSTClusterMaker::outputMSTFile, arg_parser::STRING},
00087 {"--dont-cluster", "Just generate MST data. Don't do clustering",
00088 &PMSTClusterMaker::dontCluster, arg_parser::BOOLEAN},
00089 {"--pretty-print", "Print a pretty cluster tree.",
00090 &PMSTClusterMaker::prettyPrint, arg_parser::BOOLEAN},
00091 {"--maxUse", "Set a threshold to aggressively use metrics (default=0)",
00092 &PMSTClusterMaker::maxUse, arg_parser::INTEGER},
00093 {"--cacheType", "Set type of cache (heap or mlist) to use (default=heap)",
00094 &PMSTClusterMaker::cacheType, arg_parser::STRING},
00095 {NULL, NULL, NULL, arg_parser::BOOLEAN}
00096 };
00097
00098 PMSTClusterMaker::PMSTClusterMaker(ESTAnalyzer *analyzer,
00099 const int refESTidx,
00100 const std::string& outputFile)
00101 : ClusterMaker("pmst", analyzer, refESTidx, outputFile), mst(NULL) {
00102
00103 }
00104
00105 PMSTClusterMaker::~PMSTClusterMaker() {
00106 if (mst != NULL) {
00107 delete mst;
00108 }
00109 }
00110
00111 void
00112 PMSTClusterMaker::showArguments(std::ostream& os) {
00113 ClusterMaker::showArguments(os);
00114
00115 os << "Options for " << name << " are:\n";
00116 arg_parser ap(PMSTClusterMaker::argsList);
00117 os << ap;
00118 }
00119
00120 bool
00121 PMSTClusterMaker::parseArguments(int& argc, char **argv) {
00122 arg_parser ap(PMSTClusterMaker::argsList);
00123 ap.check_args(argc, argv, false);
00124
00125 if (cacheSize < 1) {
00126 std::cerr << "Invalid cache size (must be greater than zero)\n";
00127 return false;
00128 }
00129
00130 strictOrder = !strictOrder;
00131
00132 return ClusterMaker::parseArguments(argc, argv);
00133 }
00134
00135
00136 void
00137 PMSTClusterMaker::estAdded(const int estIdx, std::vector<int>& repopulateList) {
00138
00139 sendToWorkers(estIdx, ADD_EST);
00140
00141
00142 cache->pruneCaches(estIdx, repopulateList, false);
00143
00144
00145 const int MyRank = MPI_GET_RANK();
00146 const int WorkerCount = pData->getWorkerCount();
00147 for(int workerID = MyRank+1; (workerID <= MyRank+WorkerCount);
00148 workerID++) {
00149
00150
00151 const int sourceRank = (strictOrder ? workerID : MPI_ANY_SOURCE);
00152 MPI_STATUS msgInfo;
00153 MPI_PROBE(sourceRank, REPOPULATE_REQUEST, msgInfo);
00154
00155
00156 const int dataSize = msgInfo.Get_count(MPI_TYPE_INT);
00157 int *requestData = new int[dataSize];
00158 MPI_RECV(requestData, dataSize, MPI_TYPE_INT,
00159 msgInfo.Get_source(), REPOPULATE_REQUEST);
00160
00161 if (requestData[0] > 0) {
00162 std::copy(requestData + 1, requestData + dataSize - 1,
00163 std::back_inserter(repopulateList));
00164
00165 }
00166
00167 delete [] requestData;
00168 }
00169 }
00170
00171 int
00172 PMSTClusterMaker::managerUpdateCaches(int estIdx, const bool refreshEST) {
00173
00174
00175 std::vector<int> repopulateList;
00176 if (estIdx != -1) {
00177 estAdded(estIdx, repopulateList);
00178 }
00179
00180
00181
00182 if (refreshEST) {
00183 repopulateList.push_back(estIdx);
00184 }
00185
00186 for(std::vector<int>::iterator curr = repopulateList.begin();
00187 (curr != repopulateList.end()); curr++) {
00188
00189 sendToWorkers(*curr, COMPUTE_SIMILARITY_REQUEST);
00190
00191 populateCache(*curr);
00192
00193
00194 const int ownerRank = getOwnerProcess(*curr);
00195 if (ownerRank != MPI_GET_RANK()) {
00196 int dummy;
00197 TRACK_IDLE_TIME(MPI_RECV(&dummy, 1, MPI_TYPE_INT, ownerRank,
00198 SIMILARITY_COMPUTATION_DONE));
00199 }
00200 }
00201
00202 return 0;
00203 }
00204
00205 void
00206 PMSTClusterMaker::computeNextESTidx(int& parentESTidx, int& estToAdd,
00207 float& similarity, int& alignmentData)const {
00208
00209
00210 sendToWorkers(-1, COMPUTE_MAX_SIMILARITY_REQUEST);
00211
00212 cache->getBestEntry(parentESTidx, estToAdd, similarity, alignmentData);
00213
00214 const int MyRank = MPI_GET_RANK();
00215 const int WorkerCount = pData->getWorkerCount();
00216
00217 for(int rank = MyRank+1; (rank <= MyRank+WorkerCount); rank++) {
00218
00219 const int workerRank = (strictOrder ? rank : MPI_ANY_SOURCE);
00220
00221 int remoteData[4] = {0, 0, 0, 0};
00222 TRACK_IDLE_TIME(MPI_RECV(remoteData, 4, MPI_TYPE_INT,
00223 workerRank, MAX_SIMILARITY_RESPONSE));
00224
00225 const float remoteSim = *((float *) (remoteData + 2));
00226
00227 if (analyzer->compareMetrics(remoteSim, similarity) ||
00228 (estToAdd == -1)) {
00229
00230
00231 similarity = remoteSim;
00232 parentESTidx = remoteData[0];
00233 estToAdd = remoteData[1];
00234 alignmentData= remoteData[3];
00235 }
00236 }
00237 }
00238
00239 void
00240 PMSTClusterMaker::addMoreChildESTs(const int parentESTidx, int& estToAdd,
00241 float &metric, int& alignmentData,
00242 int& pendingESTs) {
00243
00244 int newParent = -1;
00245
00246 do {
00247
00248 ASSERT ( estToAdd != -1 );
00249 mst->addNode(parentESTidx, estToAdd, metric, alignmentData);
00250 if (--pendingESTs == 0) {
00251
00252
00253 break;
00254 }
00255
00256
00257 managerUpdateCaches(estToAdd, false);
00258
00259
00260 int newChild = -1;
00261 float childMetric = 0;
00262 int childAlignment = 0;
00263 computeNextESTidx(newParent, newChild, childMetric, childAlignment);
00264
00265 if ((newParent == parentESTidx) &&
00266 (analyzer->compareMetrics(childMetric, (float) maxUse))) {
00267
00268
00269 estToAdd = newChild;
00270 metric = childMetric;
00271 alignmentData = childAlignment;
00272 } else {
00273
00274 newParent = -1;
00275 }
00276 } while (newParent == parentESTidx);
00277 }
00278
00279 int
00280 PMSTClusterMaker::manager() {
00281
00282 int pendingESTs = pData->estCount;
00283
00284 int dummy;
00285 mst = new MST(pendingESTs, analyzer->getAlignmentData(dummy));
00286
00287 int parentESTidx = -1;
00288 int estToAdd = pData->startESTidx;
00289 float metric = 0;
00290 int alignmentInfo = 0;
00291 do {
00292
00293 if ((maxUse == -1) || (parentESTidx == -1)) {
00294 ASSERT ( estToAdd != -1 );
00295 mst->addNode(parentESTidx, estToAdd, metric, alignmentInfo);
00296 if (--pendingESTs == 0) {
00297
00298
00299 break;
00300 }
00301 }
00302
00303
00304 managerUpdateCaches(estToAdd);
00305
00306
00307
00308 computeNextESTidx(parentESTidx, estToAdd, metric, alignmentInfo);
00309 ASSERT( parentESTidx != -1 );
00310 ASSERT( estToAdd != -1 );
00311 if (maxUse != -1) {
00312
00313
00314 addMoreChildESTs(parentESTidx, estToAdd, metric,
00315 alignmentInfo, pendingESTs);
00316 }
00317 } while (pendingESTs > 0);
00318
00319
00320
00321 sendToWorkers(-1, ADD_EST);
00322 printf("Complete %d\n", MPI_GET_RANK());
00323
00324 return NO_ERROR;
00325 }
00326
00327 int
00328 PMSTClusterMaker::worker() {
00329
00330
00331
00332 int estAdded = -1;
00333
00334
00335 MPI_STATUS msgInfo;
00336 const int LocalManagerRank = pData->getPartitionManager();
00337 do {
00338
00339
00340 MPI_PROBE(LocalManagerRank, MPI_ANY_TAG, msgInfo);
00341 if (msgInfo.Get_tag() == COMPUTE_SIMILARITY_REQUEST) {
00342
00343
00344
00345 int estIdx = -1;
00346 MPI_RECV(&estIdx, 1, MPI_TYPE_INT, LocalManagerRank,
00347 COMPUTE_SIMILARITY_REQUEST);
00348
00349 populateCache(estIdx);
00350 } else if (msgInfo.Get_tag() == COMPUTE_MAX_SIMILARITY_REQUEST) {
00351
00352
00353
00354 int dummy = 0;
00355 MPI_RECV(&dummy, 1, MPI_TYPE_INT, LocalManagerRank,
00356 COMPUTE_MAX_SIMILARITY_REQUEST);
00357 int bestEntry[4];
00358 float similarity = 0;
00359
00360 cache->getBestEntry(bestEntry[0], bestEntry[1],
00361 similarity, bestEntry[3]);
00362
00363
00364
00365 int *temp = reinterpret_cast<int*>(&similarity);
00366 bestEntry[2] = *temp;
00367 MPI_SEND(bestEntry, 4, MPI_TYPE_INT, LocalManagerRank,
00368 MAX_SIMILARITY_RESPONSE);
00369 } else if (msgInfo.Get_tag() == ADD_EST) {
00370
00371 MPI_RECV(&estAdded, 1, MPI_TYPE_INT, LocalManagerRank, ADD_EST);
00372 if (estAdded == -1) {
00373
00374
00375 break;
00376 }
00377
00378
00379 std::vector<int> repopulateList;
00380 cache->pruneCaches(estAdded, repopulateList);
00381
00382 MPI_SEND(&repopulateList[0], repopulateList.size(),
00383 MPI_TYPE_INT, LocalManagerRank, REPOPULATE_REQUEST);
00384 }
00385 } while (estAdded != -1);
00386
00387 return NO_ERROR;
00388 }
00389
00390 int
00391 PMSTClusterMaker::getOwnerProcess(const int estIdx) const {
00392 const int MyRank = MPI_GET_RANK();
00393 const int ProcessCount = pData->getWorkerCount() + 1;
00394 if (ProcessCount > 1) {
00395
00396
00397 BipartiteData* bpData = static_cast<BipartiteData*> (pData);
00398 int startIdx1 = bpData->partition1->startESTidx;
00399 int startIdx2 = bpData->partition2->startESTidx;
00400 int rank = 0;
00401 if (estIdx < startIdx2) {
00402 int factor = bpData->partition1->estCount / ProcessCount;
00403 int rem = bpData->partition1->estCount % ProcessCount;
00404 rank = (estIdx - startIdx1 - rem) / factor;
00405 if (rank < 0) rank = 0;
00406
00407
00408 } else {
00409 int factor = bpData->partition2->estCount / ProcessCount;
00410 int rem = bpData->partition2->estCount % ProcessCount;
00411 rank = (estIdx - startIdx2 - rem) / factor;
00412 if (rank < 0) rank = 0;
00413
00414
00415 }
00416 return rank + bpData->getPartitionManager();
00417 }
00418 return MyRank;
00419 }
00420
00421 float
00422 PMSTClusterMaker::analyze(const int otherEST) {
00423 return analyzer->analyze(otherEST);
00424 }
00425
00426 void
00427 PMSTClusterMaker::getOwnedESTidx(const int estIdx, int& startIndex,
00428 int& endIndex) {
00429 if (pData->getPartitionManager() == -1) {
00430
00431 startIndex = pData->startESTidx;
00432 endIndex = pData->startESTidx + pData->estCount;
00433 } else {
00434
00435
00436
00437 const int ProcessCount = pData->getWorkerCount() + 1;
00438 int rank = MPI_GET_RANK() - pData->getPartitionManager();
00439 BipartiteData* bpData = static_cast<BipartiteData*> (pData);
00440 int startIdx1 = bpData->partition1->startESTidx;
00441 int startIdx2 = bpData->partition2->startESTidx;
00442 if (estIdx < startIdx2) {
00443
00444 int factor = bpData->partition2->estCount / ProcessCount;
00445 int rem = bpData->partition2->estCount % ProcessCount;
00446 startIndex = startIdx2 + (factor * rank);
00447 endIndex = startIndex + factor;
00448 if (rank == 0) endIndex += rem;
00449 else {
00450 startIndex += rem;
00451 endIndex += rem;
00452 }
00453 } else {
00454
00455 int factor = bpData->partition1->estCount / ProcessCount;
00456 int rem = bpData->partition1->estCount % ProcessCount;
00457 startIndex = startIdx1 + (factor * rank);
00458 endIndex = startIndex + factor;
00459 if (rank == 0) endIndex += rem;
00460 else {
00461 startIndex += rem;
00462 endIndex += rem;
00463 }
00464 }
00465
00466 }
00467 }
00468
00469 void
00470 PMSTClusterMaker::populateCache(const int estIdx, SMList* metricList) {
00471
00472
00473 int startESTidx, endESTidx;
00474 getOwnedESTidx(estIdx, startESTidx, endESTidx);
00475
00476
00477 analyzer->setReferenceEST(estIdx);
00478
00479
00480 SMList smList;
00481 const float InvalidMetric = analyzer->getInvalidMetric();
00482
00483 bool needInvalidMetric = true;
00484
00485 for(int otherIdx = startESTidx; (otherIdx < endESTidx); otherIdx++) {
00486 if ((otherIdx == estIdx) || (cache->isESTinMST(otherIdx)) ||
00487 (EST::getEST(otherIdx)->hasBeenProcessed())) {
00488
00489
00490 continue;
00491 }
00492
00493 const float metric = analyze(otherIdx);
00494 ASSERT ( metric >= 0 );
00495
00496 int alignmentData = 0;
00497 analyzer->getAlignmentData(alignmentData);
00498
00499
00500
00501 const bool isMetricOK= analyzer->compareMetrics(metric, InvalidMetric);
00502 if ((isMetricOK) || (needInvalidMetric)) {
00503
00504 smList.push_back(CachedESTInfo(estIdx, otherIdx,
00505 metric, alignmentData));
00506
00507
00508 needInvalidMetric = (pData->getPartitionManager() != -1) ||
00509 (needInvalidMetric && isMetricOK);
00510 }
00511 }
00512
00513
00514 cache->preprocess(smList);
00515
00516 const int ownerRank = getOwnerProcess(estIdx);
00517 if (ownerRank != MPI_GET_RANK()) {
00518
00519
00520
00521
00522
00523
00524
00525 if (smList.empty()) {
00526 smList.push_back(CachedESTInfo(-1, -1, -1.0f, -1));
00527 }
00528 MPI_SEND(&smList[0], smList.size() * sizeof(CachedESTInfo),
00529 MPI_TYPE_CHAR, ownerRank, SIMILARITY_LIST);
00530
00531
00532 return;
00533 }
00534
00535
00536
00537
00538 cache->mergeList(estIdx, smList);
00539
00540 if (metricList != NULL) {
00541 metricList->insert(metricList->end(), smList.begin(), smList.end());
00542 }
00543
00544
00545
00546
00547 const int LocalManagerRank = pData->getPartitionManager();
00548 if (LocalManagerRank == -1) {
00549
00550 return;
00551 }
00552 const int MyRank = MPI_GET_RANK();
00553 for(int pid = LocalManagerRank;
00554 (pid <= LocalManagerRank+pData->getWorkerCount()); pid++) {
00555 if (pid == MyRank) {
00556
00557 continue;
00558 }
00559
00560 const int rank = (strictOrder ? pid : MPI_ANY_SOURCE);
00561 MPI_STATUS msgInfo;
00562
00563
00564 MPI_PROBE(rank, SIMILARITY_LIST, msgInfo);
00565
00566
00567 const int dataSize = msgInfo.Get_count(MPI_TYPE_CHAR);
00568 SMList remoteList(dataSize / sizeof(CachedESTInfo));
00569
00570
00571
00572 MPI_RECV(&remoteList[0], dataSize, MPI_TYPE_CHAR,
00573 msgInfo.Get_source(), SIMILARITY_LIST);
00574
00575
00576 if (hasValidSMEntry(remoteList)) {
00577 cache->mergeList(estIdx, remoteList);
00578
00579 if (metricList != NULL) {
00580 metricList->insert(metricList->end(), remoteList.begin(),
00581 remoteList.end());
00582 }
00583 }
00584 }
00585
00586
00587
00588 if (MPI_GET_RANK() != LocalManagerRank) {
00589 const int dummy = -1;
00590 MPI_SEND(&dummy, 1, MPI_TYPE_INT, LocalManagerRank,
00591 SIMILARITY_COMPUTATION_DONE);
00592 }
00593 }
00594
00595 void
00596 PMSTClusterMaker::getOwnedPartition() {
00597 const int CommSize = MPI_GET_SIZE();
00598
00599
00600
00601
00602
00603
00604
00605
00606 const int NumPartitions = (int) sqrt((float) CommSize);
00607 const int MyRank = MPI_GET_RANK();
00608 const int ESTsPerPartition = EST::getESTList().size() / NumPartitions;
00609 const int ExtraESTs = EST::getESTList().size() % NumPartitions;
00610 int startESTidx = 0;
00611 int estCount = 0;
00612
00613 if (MyRank < NumPartitions) {
00614
00615 if (MyRank <= ExtraESTs) {
00616
00617
00618
00619 startESTidx = ((ESTsPerPartition + 1) * MyRank);
00620 } else {
00621 startESTidx = MyRank * ESTsPerPartition + ExtraESTs;
00622 }
00623 estCount = ESTsPerPartition;
00624 if (MyRank < ExtraESTs) {
00625
00626
00627 estCount++;
00628 }
00629 pData = new PartitionData(startESTidx, estCount);
00630 } else {
00631
00632
00633
00634
00635 int bpCount = NumPartitions * (NumPartitions-1) / 2;
00636 int bpProcessCount = CommSize - NumPartitions;
00637 int procsPerBP = bpProcessCount / bpCount;
00638 int extraProcs = bpProcessCount % bpCount;
00639 int rank = MyRank - NumPartitions;
00640
00641
00642 if (rank < extraProcs*(procsPerBP+1)) {
00643 procsPerBP += 1;
00644 }
00645 int manager = rank;
00646 int workerCount = procsPerBP - 1;
00647 if ((rank % procsPerBP) != 0) {
00648
00649 manager = rank - (rank % procsPerBP);
00650 }
00651
00652 int bpID = 0;
00653 int id1 = 0;
00654 int id2 = 0;
00655 if (rank >= extraProcs) {
00656 bpID = (rank - extraProcs) / procsPerBP;
00657 } else {
00658 bpID = rank / procsPerBP;
00659 }
00660 id1 = ((bpID) * 2) / NumPartitions;
00661 if (bpID < NumPartitions-1) {
00662 id2 = bpID+1;
00663 } else {
00664 id2 = ((bpID-1) % (NumPartitions-id1)) + id1;
00665 }
00666
00667
00668
00669
00670 if (id1 <= ExtraESTs) {
00671
00672
00673
00674 startESTidx = ((ESTsPerPartition + 1) * id1);
00675 } else {
00676 startESTidx = id1 * ESTsPerPartition + ExtraESTs;
00677 }
00678 estCount = ESTsPerPartition;
00679 if (id1 < ExtraESTs) {
00680
00681
00682 estCount++;
00683 }
00684
00685 int startESTidx2 = 0;
00686 int estCount2 = 0;
00687 if (id2 <= ExtraESTs) {
00688
00689
00690
00691 startESTidx2 = ((ESTsPerPartition + 1) * id2);
00692 } else {
00693 startESTidx2 = id2 * ESTsPerPartition + ExtraESTs;
00694 }
00695 estCount2 = ESTsPerPartition;
00696 if (id2 < ExtraESTs) {
00697
00698
00699 estCount2++;
00700 }
00701
00702
00703 pData = new BipartiteData(startESTidx, estCount+estCount2,
00704 new PartitionData(startESTidx, estCount),
00705 new PartitionData(startESTidx2, estCount2),
00706 manager+NumPartitions, workerCount);
00707
00708
00709 }
00710 }
00711
00712 void
00713 PMSTClusterMaker::displayStats(std::ostream& os) {
00714
00715 cache->displayStats(os, MPI_GET_RANK());
00716
00717 MPIStats::displayStats(os);
00718 }
00719
00720 int
00721 PMSTClusterMaker::mergeManager(MSTCluster& rootCluster, const int threshold) {
00722
00723
00724 PMSTHeapCache* pCache
00725 = new PMSTHeapCache(EST::getESTList().size(), 0,
00726 EST::getESTList().size(), analyzer);
00727
00728
00729 NodeList nodes = mst->getNodes();
00730 SMList smList;
00731
00732
00733 for(NodeList::const_iterator entry = nodes.begin();
00734 (entry != nodes.end()); entry++) {
00735 if (entry->parentIdx != -1) {
00736
00737 smList.push_back(CachedESTInfo(entry->parentIdx, entry->estIdx,
00738 entry->metric, entry->alignmentMetric));
00739 }
00740 }
00741
00742
00743 delete mst;
00744
00745
00746
00747 pCache->mergeList(0, smList);
00748
00749
00750
00751 int numPartitions = (int) sqrt((float) MPI_GET_SIZE());
00752 int numGraphs = numPartitions + ((numPartitions * (numPartitions-1)) / 2);
00753 for (int count = 1; count < numGraphs; count++) {
00754 MPI_STATUS msgInfo;
00755
00756
00757 MPI_PROBE(MPI_ANY_SOURCE, SIMILARITY_LIST, msgInfo);
00758
00759
00760 const int dataSize = msgInfo.Get_count(MPI_TYPE_CHAR);
00761 SMList remoteList(dataSize / sizeof(CachedESTInfo));
00762
00763
00764
00765 MPI_RECV(&remoteList[0], dataSize, MPI_TYPE_CHAR,
00766 msgInfo.Get_source(), SIMILARITY_LIST);
00767
00768
00769 pCache->mergeList(0, remoteList);
00770 }
00771
00772
00773
00774
00775 int pendingESTs = EST::getESTList().size();
00776
00777
00778 int* parent = new int[pendingESTs];
00779 bool* root = new bool[pendingESTs];
00780 for (int i = 0; i < pendingESTs; i++) {
00781 root[i] = true;
00782 parent[i] = 1;
00783 }
00784
00785
00786 int dummy;
00787 mst = new MST(pendingESTs, analyzer->getAlignmentData(dummy));
00788
00789 int parentESTidx = -1;
00790 int estToAdd = refESTidx;
00791 float metric = 0;
00792 int alignmentInfo = 0;
00793
00794 pCache->popBestEntry(parentESTidx, estToAdd, metric, alignmentInfo);
00795
00796
00797
00798
00799
00800 mst->addNode(-1, parentESTidx, 0, 0);
00801 pendingESTs--;
00802
00803 int i = parentESTidx;
00804 int j = estToAdd;
00805
00806 bool reachedThreshold = false;
00807
00808 do {
00809 if (!reachedThreshold && metric > threshold) {
00810
00811
00812 reachedThreshold = true;
00813
00814 rootCluster.makeMergedClusters(EST::getESTList().size(),
00815 parent, root);
00816 }
00817
00818 mst->addNode(parentESTidx, estToAdd, metric, alignmentInfo);
00819 if (--pendingESTs == 0) {
00820
00821
00822 break;
00823 }
00824
00825
00826 if (parent[i] < parent[j]) {
00827 parent[j] += parent[i];
00828 root[i] = false;
00829 parent[i] = j;
00830 } else {
00831 parent[i] += parent[j];
00832 root[j] = false;
00833 parent[j] = i;
00834 }
00835
00836
00837
00838 do {
00839
00840 pCache->popBestEntry(parentESTidx, estToAdd, metric,
00841 alignmentInfo);
00842 i = parentESTidx;
00843 j = estToAdd;
00844 int itemp = parentESTidx;
00845 int jtemp = estToAdd;
00846 while (!root[i]) {
00847 i = parent[i];
00848 }
00849
00850 while (itemp != i) {
00851 int next = parent[itemp];
00852 parent[itemp] = i;
00853 itemp = next;
00854 }
00855 while (!root[j]) {
00856 j = parent[j];
00857 }
00858
00859 while (jtemp != j) {
00860 int next = parent[jtemp];
00861 parent[jtemp] = j;
00862 jtemp = next;
00863 }
00864 } while (i == j);
00865
00866 ASSERT( parentESTidx != -1 );
00867 ASSERT( estToAdd != -1 );
00868
00869 } while (pendingESTs > 0);
00870
00871
00872 delete [] parent;
00873 delete [] root;
00874 delete pCache;
00875
00876
00877 return NO_ERROR;
00878 }
00879
00880 int
00881 PMSTClusterMaker::mergeWorker() {
00882
00883 NodeList nodes = mst->getNodes();
00884 SMList smList;
00885
00886
00887 for(NodeList::const_iterator entry = nodes.begin();
00888 (entry != nodes.end()); entry++) {
00889 if (entry->parentIdx != -1) {
00890
00891 smList.push_back(CachedESTInfo(entry->parentIdx, entry->estIdx,
00892 entry->metric, entry->alignmentMetric));
00893 }
00894 }
00895
00896
00897 MPI_SEND(&smList[0], smList.size() * sizeof(CachedESTInfo),
00898 MPI_TYPE_CHAR, MANAGER_RANK, SIMILARITY_LIST);
00899
00900
00901 delete mst;
00902 mst = NULL;
00903
00904
00905 return NO_ERROR;
00906 }
00907
00908 int
00909 PMSTClusterMaker::makeClusters() {
00910 int result = NO_ERROR;
00911
00912 MSTCluster root;
00913
00914
00915
00916 if ((result = analyzer->initialize()) != NO_ERROR) {
00917
00918 return result;
00919 }
00920
00921 int totalSuccesses = 0;
00922
00923 if (inputMSTFile == NULL) {
00924
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935 getOwnedPartition();
00936 if (!strcmp(cacheType, "mlist")) {
00937 cache = new MSTMultiListCache(EST::getESTList().size(),
00938 pData->startESTidx,
00939 pData->ownedESTCount, analyzer,
00940 !noCacheRepop, cacheSize);
00941 } else {
00942 cache = new MSTHeapCache(EST::getESTList().size(),
00943 pData->startESTidx,
00944 pData->ownedESTCount, analyzer,
00945 !noCacheRepop, cacheSize);
00946 }
00947
00948
00949
00950 if (pData->getPartitionManager() == -1 ||
00951 pData->getPartitionManager() == MPI_GET_RANK()) {
00952
00953 result = manager();
00954 } else {
00955
00956 result = worker();
00957 }
00958
00959
00960
00961 int threshold = 40;
00962 Heuristic* tv = (HeuristicChain::getHeuristicChain())
00963 ->getHeuristic("tv");
00964 if (tv != NULL) {
00965
00966 int tvSuccesses = tv->getSuccessCount();
00967
00968 totalSuccesses = tvSuccesses;
00969
00970
00971 if (MPI_GET_RANK() == MANAGER_RANK) {
00972 for (int i = 1; i < MPI_GET_SIZE(); i++) {
00973 TRACK_IDLE_TIME(MPI_RECV(&tvSuccesses, 1, MPI_TYPE_INT,
00974 MPI_ANY_SOURCE,
00975 COMPUTE_TOTAL_ANALYSIS_COUNT));
00976 totalSuccesses+=tvSuccesses;
00977 }
00978
00979 threshold = (int) root.calculateThreshold(EST::getESTList().size(),
00980 percentile, totalSuccesses, analyzer);
00981 } else {
00982
00983 MPI_SEND(&tvSuccesses, 1, MPI_TYPE_INT, MANAGER_RANK,
00984 COMPUTE_TOTAL_ANALYSIS_COUNT);
00985 }
00986 }
00987
00988
00989
00990
00991
00992
00993 std::ostringstream buffer;
00994 displayStats(buffer);
00995 std::cout << buffer.str() << std::endl;
00996
00997 delete cache;
00998
00999
01000
01001
01002 if (MPI_GET_RANK() == MANAGER_RANK) {
01003
01004 result = mergeManager(root, threshold);
01005 } else if (pData->getPartitionManager() == -1 ||
01006 pData->getPartitionManager() == MPI_GET_RANK()) {
01007
01008
01009 result = mergeWorker();
01010 }
01011
01012 delete pData;
01013 } else {
01014
01015
01016 mst = MST::deSerialize(inputMSTFile);
01017 }
01018
01019 if ((result != NO_ERROR) || (mst == NULL)) {
01020
01021 return result;
01022 }
01023 ASSERT ( mst != NULL );
01024
01025 if ((outputMSTFile != NULL) && (mst != NULL)) {
01026 mst->serialize(outputMSTFile, (inputMSTFile != NULL) ? inputMSTFile :
01027 analyzer->getInputFileName());
01028 }
01029
01030 if (!dontCluster) {
01031
01032
01033
01034 std::ofstream outFile;
01035 if (!outputFileName.empty()) {
01036 outFile.open(outputFileName.c_str());
01037 if (!outFile.good()) {
01038 std::cerr << "Error opening output file " << outputFileName
01039 << " for writing cluster data.\n"
01040 << "Cluster data will be dumped on stdout.\n";
01041 }
01042 }
01043 if (prettyPrint) {
01044 root.printClusterTree((outFile.is_open() && outFile.good()) ?
01045 outFile : std::cout);
01046 } else {
01047
01048 ((outFile.is_open() && outFile.good()) ? outFile : std::cout)
01049 << root << std::endl;
01050 }
01051 }
01052
01053 return result;
01054 }
01055
01056 void
01057 PMSTClusterMaker::sendToWorkers(int data, const int tag) const {
01058 if (pData->getPartitionManager() == MPI_GET_RANK()) {
01059 const int MyRank = MPI_GET_RANK();
01060 for (int i = MyRank+1; i <= MyRank+pData->getWorkerCount(); i++) {
01061 MPI_SEND(&data, 1, MPI_TYPE_INT, i, tag);
01062 }
01063 }
01064 }
01065
01066 bool
01067 PMSTClusterMaker::hasValidSMEntry(const SMList& list) const {
01068 if (list.empty()) {
01069
01070
01071 return false;
01072 }
01073 if (list.size() > 1) {
01074
01075
01076 return true;
01077 }
01078 if (list[0].estIdx == -1) {
01079
01080
01081 return false;
01082 }
01083
01084 return true;
01085 }
01086
01087 #endif