/* paraHub - Parasol hub server. This is the heart of the parasol system * and consists of several threads - sucketSucker, heartbeat, a collection * of spokes, as well as the main hub thread. The system is synchronized * around a message queue that the hub reads and the other threads write. * * The purpose of socketSucker is to move messages from the UDP * socket, which has a limited queue size, to the message queue, which * can be much larger. The spoke daemons exist to send messages to compute * nodes. Since sending a message to a node can take a while depending on * the network conditions, the multiple spokes allow the system to be * delivering messages to multiple nodes simultaniously. The heartbeat * daemon simply sits in a loop adding a heartbeat message to the message * queue every 15 seconds or so. The hub thead is responsible for * keeping track of everything. * * The hub keeps track of users, batches, jobs, and machines. It tries * to balance machine usage between users and between batches. If a machine * goes down it will restart the jobs the machine was running on other machines. * When a job finishes it will add a line about the job to the results file * associated with the batch. * * A fair bit of the hub's code is devoted to scheduling. It does this by * periodically "planning" what batches to associate with what machines. * When a machine is free it will run the next job from one of it's batches. * A number of events including a new batch of jobs, machines being added or * removed, and so forth can make the system decide it needs to replan. The * replanning itself is done in the next heartbeat. * * When the plan is in place, the most common thing the system does is * try to run the next job. It keeps lists of free machines and free spokes, * and for the most part just just takes the next machine, a job from one * of the batches the machine is running, and the next free spoke, and sends * a message to the machine via the spoke to run the job. This * indirection of starting jobs via a separate spoke process avoids the * hub daemon itself having to wait for a response from a compute node * over the network. * * When a spoke is done assigning a job, the spoke sends a 'recycleSpoke' * message to the hub, which puts the spoke back on the freeSpoke list. * Likewise when a job is done the machine running the jobs sends a * 'job done' message to the hub, which puts the machine back on the * free list, writes the job exit code to a file, and removes the job * from the system. * * Sometimes a spoke will find that a machine is down. In this case it * sends a 'node down' message to the hub as well as the 'spoke free' * message. The hub will then move the machine to the deadMachines list, * and put the job back on the top of the pending list. * * The heartbeat messages stimulate the hub to do various background * chores. When the hub gets a heartbeat message it * does a few things: * o - It calls runner to try and start some more jobs. (Runner * is also called at the end of processing a recycleSpoke, * jobDone, addJob or addMachine message. Typically runner * won't find anything new to run in the heartbeat, but this * is put here mostly just in case of unforseen issues.) * o - It calls graveDigger, a routine which sees if machines * on the dead list have come back to life. * o - It calls hangman, a routine which sees if jobs the system * thinks have been running for a long time are still * running on the machine they have been assigned to. * If the machine has gone down it is moved to the dead list * and the job is reassigned. */ #include "paraCommon.h" #include "options.h" #include "linefile.h" #include "hash.h" #include "errabort.h" #include "dystring.h" #include "dlist.h" #include "net.h" #include "internet.h" #include "paraHub.h" #include "machSpec.h" #include "log.h" #include "obscure.h" #include "sqlNum.h" /* command line option specifications */ static struct optionSpec optionSpecs[] = { {"spokes", OPTION_INT}, {"jobCheckPeriod", OPTION_INT}, {"machineCheckPeriod", OPTION_INT}, {"subnet", OPTION_STRING}, {"nextJobId", OPTION_INT}, {"logFacility", OPTION_STRING}, {"logMinPriority", OPTION_STRING}, {"log", OPTION_STRING}, {"debug", OPTION_BOOLEAN}, {"noResume", OPTION_BOOLEAN}, {"ramUnit", OPTION_STRING}, {"defaultJobRam", OPTION_INT}, {NULL, 0} }; char *version = PARA_VERSION; /* Version number. */ /* Some command-line configurable quantities and their defaults. */ int jobCheckPeriod = 10; /* Minutes between checking running jobs. */ int machineCheckPeriod = 20; /* Minutes between checking dead machines. */ int assumeDeadPeriod = 60; /* If haven't heard from job in this long assume * machine running it is dead. */ int initialSpokes = 30; /* Number of spokes to start with. */ unsigned char hubSubnet[4] = {255,255,255,255}; /* Subnet to check. */ int nextJobId = 0; /* Next free job id. */ time_t startupTime; /* Clock tick of paraHub startup. */ /* not yet configurable */ int sickNodeThreshold = 3; /* Treat node as sick if this number of failures */ int sickBatchThreshold = 25; /* Auto-chill sick batch if this number of continuous failures */ void usage() /* Explain usage and exit. */ { errAbort("paraHub - parasol hub server version %s\n" "usage:\n" " paraHub machineList\n" "Where machine list is a file with the following columns:\n" " name - Network name\n" " cpus - Number of CPUs we can use\n" " ramSize - Megabytes of memory\n" " tempDir - Location of (local) temp dir\n" " localDir - Location of local data dir\n" " localSize - Megabytes of local disk\n" " switchName - Name of switch this is on\n" "\n" "options:\n" " -spokes=N Number of processes that feed jobs to nodes - default %d.\n" " -jobCheckPeriod=N Minutes between checking on job - default %d.\n" " -machineCheckPeriod=N Minutes between checking on machine - default %d.\n" " -subnet=XXX.YYY.ZZZ Only accept connections from subnet (example 192.168).\n" " -nextJobId=N Starting job ID number.\n" " -logFacility=facility Log to the specified syslog facility - default local0.\n" " -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n" " defaults to \"warn\"\n" " -log=file Log to file instead of syslog.\n" " -debug Don't daemonize\n" " -noResume Don't try to reconnect with jobs running on nodes.\n" " -ramUnit=N Number of bytes of RAM in the base unit used by the jobs.\n" " Default is RAM on node divided by number of cpus on node.\n" " Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n" " e.g. 4g = 4 Gigabytes.\n" " -defaultJobRam=N Number of ram units in a job has no specified ram usage.\n" " Defaults to 1.\n" , version, initialSpokes, jobCheckPeriod, machineCheckPeriod ); } struct spoke *spokeList; /* List of all spokes. */ struct dlList *freeSpokes; /* List of free spokes. */ struct dlList *busySpokes; /* List of busy spokes. */ struct dlList *deadSpokes; /* List of dead spokes. */ struct machine *machineList; /* List of all machines. */ struct dlList *freeMachines; /* List of machines idle. */ struct dlList *readyMachines; /* List of machines ready for jobs. */ struct dlList *blockedMachines; /* List of machines ready but blocked by runningCount. */ struct dlList *busyMachines; /* List of machines running jobs. */ struct dlList *deadMachines; /* List of machines that aren't running. */ struct dlList *runningJobs; /* Jobs that are running. Preserves oldest first order. */ struct dlList *hangJobs; /* Jobs running hang check list. */ struct hash *userHash; /* Hash of all users. */ struct user *userList; /* List of all users. */ struct batch *batchList; /* List of all batches. */ struct dlList *queuedUsers; /* Users with jobs in queue. */ struct dlList *unqueuedUsers; /* Users with no jobs in queue. */ struct hash *machineHash; /* Find if machine exists already */ struct hash *stringHash; /* Unique strings throughout system go here * including directory names and results file * names/batch names. */ struct resultQueue *resultQueues; /* Result files. */ int finishedJobCount = 0; /* Number of finished jobs. */ int crashedJobCount = 0; /* Number of crashed jobs. */ char *jobIdFileName = "parasol.jid"; /* File name where jobId file is. */ FILE *jobIdFile = NULL; /* Handle to jobId file. */ char *hubHost; /* Name of machine running this. */ struct rudp *rudpOut; /* Our rUDP socket. */ /* Variables for new scheduler */ // TODO make commandline param options to override defaults for unit sizes? /* using machines list spec info for defaults */ int cpuUnit = 1; /* 1 CPU */ /* someday this could be float 0.5 */ long long ramUnit = 512 * 1024 * 1024; /* 500 MB */ int defaultJobCpu = 1; /* number of cpuUnits in default job usage */ int defaultJobRam = 1; /* number of ramUnits in default job usage */ /* for the resource array dimensions */ int maxCpuInCluster = 0; /* node with largest number of cpu units */ int maxRamInCluster = 0; /* node with largest number of ram units */ struct slRef ***perCpu = NULL; /* an array of resources sharing the same cpu units free units count */ boolean needsPlanning = FALSE; /* remember if situation changed, need new plan */ void setupLists() /* Make up machine, spoke, user and job lists - all doubly linked * so it is fast to remove items from one list and put them * on another. */ { freeMachines = newDlList(); readyMachines = newDlList(); blockedMachines = newDlList(); busyMachines = newDlList(); deadMachines = newDlList(); runningJobs = newDlList(); hangJobs = newDlList(); freeSpokes = newDlList(); busySpokes = newDlList(); deadSpokes = newDlList(); queuedUsers = newDlList(); unqueuedUsers = newDlList(); userHash = newHash(6); } int avgBatchTime(struct batch *batch) /**/ { if (batch->doneCount == 0) return 0; return batch->doneTime / batch->doneCount; } boolean nodeSickOnAllBatches(struct user *user, char *machineName) /* Return true if all of a user's current batches believe the machine is sick. */ { struct dlNode *node = user->curBatches->head; if (dlEnd(node)) return FALSE; for (; !dlEnd(node); node = node->next) { struct batch *batch = node->val; /* does any batch think the node is not sick? */ if (hashIntValDefault(batch->sickNodes, machineName, 0) < sickNodeThreshold) { return FALSE; } } return TRUE; } void updateUserSickNode(struct user *user, char *machineName) /* If all of a users batches reject a sick machine, then the user rejects it. */ { boolean allSick = nodeSickOnAllBatches(user, machineName); if (allSick) hashStore(user->sickNodes, machineName); else hashRemove(user->sickNodes, machineName); } void updateUserSickNodes(struct user *user) /* Update user sickNodes. A node is only sick if all batches call it sick. */ { struct dlNode *node; struct batch *batch; hashFree(&user->sickNodes); user->sickNodes = newHashExt(6, FALSE); node = user->curBatches->head; if (!dlEnd(node)) { batch = node->val; struct hashEl *el, *list = hashElListHash(batch->sickNodes); for (el = list; el != NULL; el = el->next) { updateUserSickNode(user, el->name); } hashElFreeList(&list); } } boolean userIsActive(struct user *user) /* Return TRUE if user has jobs running or in queue */ { return user->runningCount > 0 || !dlEmpty(user->curBatches); } int listSickNodes(struct paraMessage *pm) /* find nodes that are sick for all active users */ { int sickNodeCount = 0, userCount = 0; struct user *user; if (userList) { struct hashEl *el, *list = NULL; /* get list from an active user if any, and get active-users count */ for (user = userList; user != NULL; user = user->next) { if (userIsActive(user)) { ++userCount; if (!list) list = hashElListHash(user->sickNodes); } } if (list) { for (el = list; el != NULL; el = el->next) { boolean allSick = TRUE; for (user = userList; user != NULL; user = user->next) { if (userIsActive(user)) if (!hashLookup(user->sickNodes, el->name)) allSick = FALSE; } if (allSick) { ++sickNodeCount; if (pm) { pmClear(pm); pmPrintf(pm, "%s", el->name); pmSend(pm, rudpOut); } } } hashElFreeList(&list); } } if (sickNodeCount > 0) { if (pm) { pmClear(pm); pmPrintf(pm, "Strength of evidence: %d users", userCount); pmSend(pm, rudpOut); } } if (pm) pmSendString(pm, rudpOut, ""); return sickNodeCount; } void updateUserMaxJob(struct user *user) /* Update user maxJob. >=0 only if all batches have >=0 maxJob values */ { /* Note - at this point the user->maxJob is mostly ornamental, * it has been left in for people who want to see it in list users */ struct dlNode *node; struct batch *batch; boolean unlimited = FALSE; user->maxJob = 0; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { batch = node->val; if (batch->maxJob >= 0) user->maxJob += batch->maxJob; else unlimited = TRUE; } if (unlimited) user->maxJob = -1; } void updateUserPriority(struct user *user) /* Update user priority. Equals minimum of current batch priorities */ { struct dlNode *node; struct batch *batch; user->priority = MAX_PRIORITY; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { batch = node->val; if (batch->priority < user->priority) user->priority = batch->priority; } } struct batch *findBatchInList(struct dlList *list, char *nameString) /* Find a batch of jobs in list or return NULL. * nameString must be from stringHash. */ { struct dlNode *node; for (node = list->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if (nameString == batch->name) return batch; } return NULL; } struct batch *newBatch(char *nameString, struct user *user) /* Make new batch. NameString must be in stringHash already */ { struct batch *batch; AllocVar(batch); slAddHead(&batchList, batch); AllocVar(batch->node); batch->node->val = batch; batch->name = nameString; batch->user = user; batch->jobQueue = newDlList(); batch->priority = NORMAL_PRIORITY; batch->maxJob = -1; batch->sickNodes = newHashExt(6, FALSE); batch->cpu = defaultJobCpu; /* number of cpuUnits in default job usage */ batch->ram = defaultJobRam; /* number of ramUnits in default job usage */ needsPlanning = TRUE; return batch; } struct batch *findBatch(struct user *user, char *name, boolean holding) /* Find batch of jobs. If no such batch yet make it. */ { struct batch *batch; name = hashStoreName(stringHash, name); batch = findBatchInList(user->curBatches, name); if (batch == NULL) { batch = findBatchInList(user->oldBatches, name); if (batch != NULL) dlRemove(batch->node); else batch = newBatch(name, user); if (holding && dlEmpty(batch->jobQueue)) /* setPriority must not release batch if jobs not yet pushed */ dlAddTail(user->oldBatches, batch->node); else dlAddTail(user->curBatches, batch->node); needsPlanning = TRUE; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); } return batch; } struct user *findUser(char *name) /* Find user. If it's the first time we've seen this * user then make up a user object and put it on the * idle user list. */ { struct user *user = hashFindVal(userHash, name); if (user == NULL) { AllocVar(user); slAddHead(&userList, user); hashAddSaveName(userHash, name, user, &user->name); AllocVar(user->node); user->node->val = user; dlAddTail(unqueuedUsers, user->node); user->curBatches = newDlList(); user->oldBatches = newDlList(); user->sickNodes = newHashExt(6, FALSE); } return user; } int userQueuedCount(struct user *user) /* Count up jobs user has waiting */ { struct dlNode *node; struct batch *batch; int count = 0; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { batch = node->val; count += batch->queuedCount; } return count; } struct batch *findLuckyBatch(struct user *user) /* Find the batch that gets to run a job. */ { struct batch *minBatch = NULL; int minScore = BIGNUM; struct dlNode *node; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if (batch->planning) { if (batch->planScore < minScore) { minScore = batch->planScore; minBatch = batch; } } } return minBatch; } struct user *findLuckyUser() /* Find lucky user who gets to run a job. */ { struct user *minUser = NULL; int minScore = BIGNUM; struct dlNode *node; for (node = queuedUsers->head; !dlEnd(node); node = node->next) { struct user *user = node->val; if (user->planningBatchCount > 0) { if (user->planScore < minScore) { minScore = user->planScore; minUser = user; } } } return minUser; } void resetBatchesForPlanning(struct user *user) /* Initialize batches for given user for planning.*/ { struct dlNode *node; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; batch->planning = TRUE; batch->planCount = 0; /* adding 1 to planCount helps suppress running any jobs when priority is set very high */ batch->planScore = 1 * batch->priority; if (batch->maxJob == 0) batch->planning = FALSE; if (batch->planning) { ++user->planningBatchCount; } } } void resetUsersForPlanning() /* Initialize users for planning. */ { struct dlNode *node; for (node = queuedUsers->head; !dlEnd(node); node = node->next) { struct user *user = node->val; user->planCount = 0; user->planningBatchCount = 0; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); /* adding 1 to planCount helps suppress running any jobs when priority is set very high */ user->planScore = 1 * user->priority; resetBatchesForPlanning(user); } } void unactivateBatchIfEmpty(struct batch *batch) /* If job queue on batch is empty then remove batch from * user's active batch list, and possibly user from active * user list. */ { if (dlEmpty(batch->jobQueue)) { struct user *user = batch->user; batch->queuedCount = 0; dlRemove(batch->node); dlAddTail(user->oldBatches, batch->node); batch->planCount = 0; /* use as a signal that it's not active any more */ needsPlanning = TRUE; /* remember if situation changed, need new plan */ updateUserPriority(user); updateUserMaxJob (user); updateUserSickNodes(user); /* Check if it's last user batch and if so take them off queue */ if (dlEmpty(user->curBatches)) { dlRemove(user->node); dlAddTail(unqueuedUsers, user->node); } } } void readTotalMachineResources(struct machine *machine, int *cpuReturn, int *ramReturn) /* Return in units the cpu and ram resources of given machine */ { int c = 0, r = 0; c = machine->machSpec->cpus / cpuUnit; r = ((long long)machine->machSpec->ramSize * 1024 * 1024) / ramUnit; *cpuReturn = c; *ramReturn = r; } void readRemainingMachineResources(struct machine *machine, int *cpuReturn, int *ramReturn) /* Calculate available cpu and ram resources in given machine */ { int c = 0, r = 0; readTotalMachineResources(machine, &c, &r); /* subtract all the resources now in-use */ struct dlNode *jobNode = NULL; for (jobNode = machine->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next) { struct job *job = jobNode->val; struct batch * batch =job->batch; c -= batch->cpu; r -= batch->ram; } *cpuReturn = c; *ramReturn = r; } struct batch *findRunnableBatch(struct machine *machine, struct slRef **pEl, boolean *pCouldRun) /* Search machine for runnable batch, preferable something not at maxJob */ { int c = 0, r = 0; readRemainingMachineResources(machine, &c, &r); struct slRef* el; for(el = machine->plannedBatches; el; el=el->next) { struct batch *batch = el->val; /* Prevent too many from this batch from running. * This is helpful for keeping the balance with longrunning batches * and maxJob. */ if (batch->cpu <= c && batch->ram <= r) { if (pCouldRun) *pCouldRun = TRUE; if (batch->runningCount < batch->planCount) { if (pEl) *pEl = el; return batch; } } } if (pEl) *pEl = NULL; return NULL; } int scoreCost(struct batch *batch) /* calculate score cost of using resources */ { return max(batch->cpu * defaultJobRam, batch->ram * defaultJobCpu); } void allocateResourcesToMachine(struct machine *mach, struct batch *batch, struct user *user, int *pC, int *pR) /* Allocate Resources to machine*/ { *pC -= batch->cpu; *pR -= batch->ram; ++batch->planCount; ++user->planCount; /* incrementally update score for batches and users */ /* scoring that accounts the resources carefully, e.g. actual ram and cpu. */ int cost = scoreCost(batch); batch->planScore += cost * batch->priority; user->planScore += cost * user->priority; /* add batch to plannedBatches queue */ refAdd(&mach->plannedBatches, batch); /* maxJob handling */ if ((batch->maxJob!=-1) && (batch->planCount >= batch->maxJob)) { /* remove batch from the allocating */ batch->planning = FALSE; --user->planningBatchCount; } } void plan(struct paraMessage *pm) /* Make a new plan allocating resources to batches */ { logDebug("executing new plan"); if (pm) { pmClear(pm); pmPrintf(pm, "cpuUnit=%d, ramUnit=%lld", cpuUnit, ramUnit); pmSend(pm, rudpOut); pmClear(pm); pmPrintf(pm, "job default units: Cpu=%d, ram=%d", defaultJobCpu, defaultJobRam); pmSend(pm, rudpOut); pmClear(pm); pmPrintf(pm, "max cluster units: Cpu=%d, ram=%d", maxCpuInCluster, maxRamInCluster); pmSend(pm, rudpOut); pmSendString(pm, rudpOut, "-----"); } //if (pm) pmSendString(pm, rudpOut, "about to initialize cpu/ram 2d arrays"); /* Initialize Resource Arrays for CPU and RAM */ /* allocate memory like a 2D array */ int c = 0, r = 0; /* +1 to allow for zero slot simplifies the code */ AllocArray(perCpu, maxCpuInCluster+1); for (c = 1; c <= maxCpuInCluster; ++c) AllocArray(perCpu[c], maxRamInCluster+1); //if (pm) pmSendString(pm, rudpOut, "about to add machines resources to cpu/ram arrays"); resetUsersForPlanning(); /* allocate machines to resource lists */ struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { slFreeList(&mach->plannedBatches); // free any from last plan if (!mach->isDead) { readTotalMachineResources(mach, &c, &r); /* Sweep mark all running jobs as oldPlan, * this helps us deal with jobsDone from old plan. * For better handling of long-running maxJob batches * with frequent replanning, * preserve the same resources on the same machines. */ struct dlNode *jobNode = NULL; for (jobNode = mach->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next) { struct job *job = jobNode->val; struct batch *batch = job->batch; struct user *user = batch->user; job->oldPlan = TRUE; if (batch->planning && (batch->maxJob != -1)) { if (pm) { //pmClear(pm); //pmPrintf(pm, "preserving batch %s on machine %s", batch->name, mach->name); //pmSend(pm, rudpOut); } allocateResourcesToMachine(mach, batch, user, &c, &r); } } if (pm) { //pmClear(pm); //pmPrintf(pm, "machSpec (%s) cpus:%d ramSize=%d" //, mach->name, mach->machSpec->cpus, mach->machSpec->ramSize); //pmSend(pm, rudpOut); } if (c < 1 || r < 1) { if (pm) { //pmClear(pm); //pmPrintf(pm, "IGNORING mach: %s c=%d cpu units; r=%d ram units", mach->name, c, r); //pmSend(pm, rudpOut); } } else { if (pm) { pmClear(pm); pmPrintf(pm, "mach: %s c=%d cpu units; r=%d ram units", mach->name, c, r); pmSend(pm, rudpOut); } refAdd(&perCpu[c][r], mach); } } } /* allocate machines to resource lists */ while(TRUE) { /* find lucky user/batch */ struct user *user = findLuckyUser(); if (!user) break; struct batch *batch = findLuckyBatch(user); if (!batch) { errAbort("unexpected error: batch not found while planning for lucky user"); break; } if (pm) { //pmClear(pm); //pmPrintf(pm, "lucky user: %s; lucky batch=%s", user->name, batch->name); //pmSend(pm, rudpOut); } /* find machine with adequate resources in resource array (if any) */ boolean found = FALSE; struct slRef **perRam = NULL; struct slRef *el = NULL; for (c = batch->cpu; c <= maxCpuInCluster; ++c) { /* an array of resources sharing the same cpu and ram free units count */ perRam = perCpu[c]; for (r = batch->ram; r <= maxRamInCluster; ++r) { if (perRam[r]) { /* avoid any machine in the sickNodes */ /* extract from list if found */ el = perRam[r]; struct slRef **listPt = &perRam[r]; while (el) { mach = (struct machine *) el->val; if (hashIntValDefault(batch->sickNodes, mach->name, 0) < sickNodeThreshold) { found = TRUE; *listPt = el->next; el->next = NULL; break; } listPt = &el->next; el = el->next; } } if (found) break; // preserve value of r } if (found) break; // preserve value of c } if (found) { /* allocate plan, reduce resources, calc new resources and pos. * move machine from old array pos to new pos. (slPopHead, slAddHead) * update its stats, and if heaps, update heaps. */ if (pm) { //pmClear(pm); //pmPrintf(pm, "found hardware cpu %d ram %d in machine %s c=%d r=%d batch=%s", //batch->cpu, batch->ram, mach->name, c, r, batch->name); //pmSend(pm, rudpOut); } allocateResourcesToMachine(mach, batch, user, &c, &r); if (pm) { //pmClear(pm); //pmPrintf(pm, "remaining hardware c=%d r=%d", c, r); //pmSend(pm, rudpOut); } if (c < 1 || r < 1) freeMem(el); /* this node has insufficient resources remaining */ else slAddHead(&perCpu[c][r], el); } else { if (pm) { //pmClear(pm); //pmPrintf(pm, "no suitable machines left, removing from planning: user %s; lucky batch %s", //user->name, batch->name); //pmSend(pm, rudpOut); } /* no suitable machine found */ /* remove batch from the allocating */ batch->planning = FALSE; --user->planningBatchCount; } } /* free arrays when finished */ for (c = 1; c <= maxCpuInCluster; ++c) { for (r = 1; r <= maxRamInCluster; ++r) { slFreeList(&perCpu[c][r]); } freeMem(perCpu[c]); } freeMem(perCpu); /* allocate machines to busy, ready, free lists */ for (mach = machineList; mach != NULL; mach = mach->next) { if (!mach->isDead) { /* See if any machines have enough resources free * to start their plan, and start those jobs. * If so, add them to the readyMachines list. */ struct dlNode *mNode = mach->node; dlRemove(mNode); /* remove it from whichever list it was on */ if (mach->plannedBatches) /* was anything planned for this machine? */ { boolean couldRun = FALSE; struct batch *batch = findRunnableBatch(mach, NULL, &couldRun); if (batch) dlAddTail(readyMachines, mNode); else if (couldRun) dlAddTail(blockedMachines, mNode); else dlAddTail(busyMachines, mNode); } else { struct dlNode *jobNode = mach->jobs->head; if (dlEnd(jobNode)) dlAddTail(freeMachines, mNode); else dlAddTail(busyMachines, mNode); } } } if (pm) { pmClear(pm); pmPrintf(pm, "# machines:" " busy %d" " ready %d" " blocked %d" " free %d" " dead %d" , dlCount(busyMachines) , dlCount(readyMachines) , dlCount(blockedMachines) , dlCount(freeMachines) , dlCount(deadMachines) ); pmSend(pm, rudpOut); pmSendString(pm, rudpOut, "end of planning"); pmSendString(pm, rudpOut, ""); } needsPlanning = FALSE; logDebug("plan finished"); } boolean runNextJob() /* Assign next job in pending queue if any to a machine. */ { /* give blocked machines another chance */ while (!dlEmpty(blockedMachines)) { struct dlNode *mNode; mNode = dlPopHead(blockedMachines); dlAddTail(readyMachines, mNode); } while(TRUE) { if (dlEmpty(readyMachines)) return FALSE; if (dlEmpty(freeSpokes)) return FALSE; struct dlNode *mNode; struct machine *machine; /* Get free machine */ mNode = dlPopHead(readyMachines); machine = mNode->val; if (!machine->plannedBatches) /* anything to do for this machine? */ { struct dlNode *jobNode = machine->jobs->head; if (dlEnd(jobNode)) dlAddTail(freeMachines, mNode); else dlAddTail(busyMachines, mNode); continue; } boolean couldRun = FALSE; /* was it limited only by runningCount? */ struct slRef *batchEl = NULL; struct batch *batch = findRunnableBatch(machine, &batchEl, &couldRun); if (!batch) { if (couldRun) dlAddTail(blockedMachines, mNode); else dlAddTail(busyMachines, mNode); continue; } /* remove the batch from the planning list */ if (!slRemoveEl(&machine->plannedBatches, batchEl)) { /* this should not happen */ logWarn("unable to remove batch from machine->plannedBatches, length: %d\n", slCount(machine->plannedBatches)); dlAddTail(freeMachines, mNode); continue; } freeMem(batchEl); if (batch->queuedCount == 0) { /* probably the batch has been chilled */ /* needsPlanning=TRUE and a new plan will come along soon. */ /* just put it back on the ready list, it will get looked at again */ /* this has the effect of removing the batch from this machine's plannedBatches */ dlAddTail(readyMachines, mNode); continue; } struct user *user = batch->user; struct dlNode *jNode, *sNode; struct spoke *spoke; struct job *job; /* Get free spoke and move them to busy lists. */ machine->lastChecked = now; sNode = dlPopHead(freeSpokes); dlAddTail(busySpokes, sNode); spoke = sNode->val; /* Get active batch from user and take job off of it. * If it's the last job in the batch move batch to * finished list. */ jNode = dlPopHead(batch->jobQueue); dlAddTail(runningJobs, jNode); job = jNode->val; dlAddTail(hangJobs, job->hangNode); ++batch->runningCount; --batch->queuedCount; ++user->runningCount; unactivateBatchIfEmpty(batch); /* Tell machine, job, and spoke about each other. */ dlAddTail(machine->jobs, job->jobNode); /* just put it back on the ready list, it will get looked at again */ dlAddTail(readyMachines, mNode); job->machine = machine; job->lastChecked = job->startTime = job->lastClockIn = now; spokeSendJob(spoke, machine, job); return TRUE; } } void runner(int count) /* Try to run a couple of jobs. */ { while (--count >= 0) if (!runNextJob()) break; } struct machine *machineNew(char *name, char *tempDir, struct machSpec *m) /* Create a new machine structure. */ { struct machine *mach; AllocVar(mach); mach->name = cloneString(name); mach->tempDir = cloneString(tempDir); AllocVar(mach->node); mach->node->val = mach; mach->machSpec = m; mach->jobs = newDlList(); return mach; } void machineFree(struct machine **pMach) /* Delete machine structure. */ { struct machine *mach = *pMach; if (mach != NULL) { freeMem(mach->node); freeMem(mach->name); freeMem(mach->tempDir); machSpecFree(&mach->machSpec); freeDlList(&mach->jobs); freez(pMach); } } struct machine *doAddMachine(char *name, char *tempDir, bits32 ip, struct machSpec *m) /* Add machine to pool. If you don't know ip yet just pass * in 0 for that argument. */ { struct machine *mach; mach = machineNew(name, tempDir, m); mach->ip = ip; dlAddTail(freeMachines, mach->node); slAddHead(&machineList, mach); needsPlanning = TRUE; return mach; } void addMachine(char *line) /* Process message to add machine to pool. */ { char *name = nextWord(&line); if (hashLookup(machineHash, name)) /* ignore duplicate machines */ { warn("machine already added: %s", name); return; } char *param2 = nextWord(&line); struct machSpec *m = NULL; AllocVar(m); if (!line) { /* for backwards compatibility, allow running without full spec, * just copy the machSpec of the first machine on the list */ *m = *machineList->machSpec; m->name = cloneString(name); m->tempDir = cloneString(param2); if (!m->tempDir) { freeMem(m); warn("incomplete addMachine request"); return; } } else { m->name = cloneString(name); m->cpus = atoi(param2); m->ramSize = atoi(nextWord(&line)); m->tempDir = cloneString(nextWord(&line)); m->localDir = cloneString(nextWord(&line)); m->localSize = atoi(nextWord(&line)); m->switchName = cloneString(nextWord(&line)); if (!m->switchName) { freeMem(m); warn("incomplete addMachine request"); return; } } doAddMachine(name, m->tempDir, 0, m); runner(1); } struct machine *findMachine(char *name) /* Find named machine. */ { struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { if (sameString(mach->name, name)) return mach; } return NULL; } struct job *jobFind(struct dlList *list, int id) /* Find node of job with given id on list. Return NULL if * not found. */ { struct dlNode *el; struct job *job; for (el = list->head; !dlEnd(el); el = el->next) { job = el->val; if (job->id == id) return job; } return NULL; } struct job *findWaitingJob(int id) /* Find job that's waiting (as opposed to running). Return * NULL if it can't be found. */ { /* If it's not running look in user job queues. */ struct user *user; struct job *job = NULL; for (user = userList; user != NULL; user = user->next) { struct dlNode *node; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if ((job = jobFind(batch->jobQueue, id)) != NULL) break; } if (job != NULL) break; } return job; } void requeueJob(struct job *job) /* Move job from running queue back to a user pending * queue. This happens when a node is down or when * it missed the message about a job. */ { struct batch *batch = job->batch; struct user *user = batch->user; job->machine = NULL; dlRemove(job->node); dlAddTail(batch->jobQueue, job->node); dlRemove(job->jobNode); dlRemove(job->hangNode); batch->runningCount -= 1; batch->queuedCount += 1; user->runningCount -= 1; dlRemove(batch->node); dlAddHead(user->curBatches, batch->node); dlRemove(user->node); dlAddHead(queuedUsers, user->node); if (batch->planCount == 0) needsPlanning = TRUE; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); } void requeueAllJobs(struct machine *mach, boolean doDead) /* Requeue all jobs on machine. */ { struct dlNode *next = NULL; struct dlNode *jobNode = NULL; for (jobNode = mach->jobs->head; !dlEnd(jobNode); jobNode = next) { struct job *job = jobNode->val; next = jobNode->next; if (doDead) { struct slInt *i = slIntNew(job->id); slAddHead( &mach->deadJobIds, i ); } /* this affects the mach->jobs list itself by removing this node */ requeueJob(job); } } boolean removeMachine(char *machName, char *user, char *reason) /* Remove machine from pool. */ { struct machine *mach; if ((mach = findMachine(machName))) { // logged as an error because it's important for admins to know that there is an // error with this machine logError("hub: user %s removed machine %s because: %s",user,machName,reason); requeueAllJobs(mach, FALSE); dlRemove(mach->node); slRemoveEl(&machineList, mach); hashRemove(machineHash, mach->name); machineFree(&mach); return TRUE; } else { logDebug("hub: user %s wanted to removed machine %s because: %s but machine was not found",user,machName,reason); return FALSE; } } void removeMachineAcknowledge(char *line, struct paraMessage *pm) /* Remove machine and send response back. */ { char *machName = nextWord(&line); char *user = nextWord(&line); char *reason = line; machName = trimSpaces(machName); char *retVal = "ok"; if (!removeMachine(machName, user, reason)) retVal = "Machine not found."; pmSendString(pm, rudpOut, retVal); pmSendString(pm, rudpOut, ""); } void machineDown(struct machine *mach) /* Mark machine as down and move it to dead list. */ { dlRemove(mach->node); mach->lastChecked = time(NULL); mach->isDead = TRUE; dlAddTail(deadMachines, mach->node); } void buryMachine(struct machine *machine) /* Reassign jobs that machine is processing and bury machine * in dead list. */ { requeueAllJobs(machine, TRUE); machineDown(machine); } void nodeDown(char *line) /* Deal with a node going down - move it to dead list and * put job back on job list. */ { struct machine *mach; char *machName = nextWord(&line); if ((mach = findMachine(machName)) != NULL) buryMachine(mach); runner(1); } char *exeFromCommand(char *cmd) /* Return executable name (without path) given command line. */ { static char exe[128]; char *s,*e; int i, size; int lastSlash = -1; /* Isolate first space-delimited word between s and e. */ s = skipLeadingSpaces(cmd); e = skipToSpaces(cmd); if (e == NULL) e = s + strlen(s); size = e - s; /* Find last '/' in this word if any, and reposition s after it. */ for (i=0; i 0) s += lastSlash + 1; /* Copy whats left to string to return . */ size = e - s; if (size >= sizeof(exe)) size = sizeof(exe)-1; memcpy(exe, s, size); exe[size] = 0; return exe; } struct job *jobNew(char *cmd, char *userName, char *dir, char *in, char *out, float cpus, long long ram, char *results, boolean forQueue) /* Create a new job structure */ { struct job *job; struct user *user = findUser(userName); struct batch *batch = findBatch(user, results, FALSE); if (forQueue && (batch->continuousCrashCount >= sickBatchThreshold)) { warn("not adding job [%s] for %s, sick batch %s", cmd, userName, batch->name); unactivateBatchIfEmpty(batch); /* handle side-effect of findBatch call above */ return NULL; } AllocVar(job); AllocVar(job->jobNode); job->jobNode->val = job; AllocVar(job->node); job->node->val = job; job->id = ++nextJobId; job->exe = cloneString(exeFromCommand(cmd)); job->cmd = cloneString(cmd); job->batch = batch; job->dir = hashStoreName(stringHash, dir); job->in = cloneString(in); job->out = cloneString(out); job->cpus = cpus; job->ram = ram; AllocVar(job->hangNode); job->hangNode->val = job; return job; } void jobFree(struct job **pJob) /* Free up a job. */ { struct job *job = *pJob; if (job != NULL) { freeMem(job->jobNode); freeMem(job->node); freeMem(job->exe); freeMem(job->cmd); freeMem(job->in); freeMem(job->out); freeMem(job->err); freeMem(job->hangNode); freez(pJob); } } boolean sendViaSpoke(struct machine *machine, char *message) /* Send a message to machine via spoke. */ { struct dlNode *node = dlPopHead(freeSpokes); struct spoke *spoke; if (node == NULL) { logDebug("hub: out of spokes!"); return FALSE; } dlAddTail(busySpokes, node); spoke = node->val; spokeSendMessage(spoke, machine, message); return TRUE; } void checkDeadNodesASAP() /* Check dead nodes ASAP, some may have been fixed. * It tweaks the time since last check on all dead machines * so that grave digger will send them resurrect messages * to see if they are alive. */ { struct dlNode *mNode; struct machine *machine; for (mNode = deadMachines->head; !dlEnd(mNode); mNode = mNode->next) { machine = mNode->val; machine->lastChecked = now - MINUTE * machineCheckPeriod; } } void checkPeriodically(struct dlList *machList, int period, char *checkMessage, int spokesToUse) /* Periodically send checkup messages to machines on list. */ { struct dlNode *mNode; struct machine *machine; char message[512]; int i; safef(message, sizeof(message), "%s", checkMessage); for (i=0; ihead->val; if (now - machine->lastChecked < period) break; machine->lastChecked = now; mNode = dlPopHead(machList); dlAddTail(machList, mNode); sendViaSpoke(machine, message); logDebug("hub: sending resurrect message to %s",machine->name); } } void hangman(int spokesToUse) /* Check that jobs are alive, sense if nodes are dead. Also send message for * busy nodes to check in for specific jobs, in case we missed one of their earlier * jobDone messages. */ { int i, period = jobCheckPeriod*MINUTE; struct dlNode *hangNode; struct job *job; struct machine *machine; for (i=0; ihead->val; if (now - job->lastChecked < period) break; job->lastChecked = now; hangNode = dlPopHead(hangJobs); dlAddTail(hangJobs, hangNode); machine = job->machine; if (now - job->lastClockIn >= MINUTE * assumeDeadPeriod) { warn("hub: node %s running %d looks dead, burying", machine->name, job->id); buryMachine(machine); break; /* jobs list has been freed by bury, break immediately */ } else { char message[512]; safef(message, sizeof(message), "check %d", job->id); sendViaSpoke(machine, message); } } } void graveDigger(int spokesToUse) /* Check out dead nodes. Try and resurrect them periodically. */ { checkPeriodically(deadMachines, MINUTE * machineCheckPeriod, "resurrect", spokesToUse); } void flushResults(char *batchName) /* Flush all results files. batchName can be NULL for all. */ { struct resultQueue *rq; for (rq = resultQueues; rq != NULL; rq = rq->next) { if (!batchName || (rq->name == batchName)) if (rq->f != NULL) fflush(rq->f); } } void writeResults(char *fileName, char *userName, char *machineName, int jobId, char *exe, time_t submitTime, time_t startTime, char *errFile, char *cmd, char *status, char *uTime, char *sTime) /* Write out job results to output queue. This * will create the output queue if it doesn't yet * exist. */ { struct resultQueue *rq; for (rq = resultQueues; rq != NULL; rq = rq->next) if (sameString(fileName, rq->name)) break; if (rq == NULL) { AllocVar(rq); slAddHead(&resultQueues, rq); rq->name = fileName; rq->f = fopen(rq->name, "a"); if (rq->f == NULL) warn("hub: couldn't open results file %s", rq->name); rq->lastUsed = now; } if (rq->f != NULL) { fprintf(rq->f, "%s %s %d %s %s %s %lu %lu %lu %s %s '%s'\n", status, machineName, jobId, exe, uTime, sTime, submitTime, startTime, now, userName, errFile, cmd); fflush(rq->f); rq->lastUsed = now; } } void writeJobResults(struct job *job, char *status, char *uTime, char *sTime) /* Write out job results to output queue. This * will create the output queue if it doesn't yet * exist. */ { struct batch *batch = job->batch; if (sameString(status, "0")) { ++finishedJobCount; ++batch->doneCount; batch->doneTime += (now - job->startTime); ++batch->user->doneCount; batch->continuousCrashCount = 0; /* remember the continuous number of times this batch has crashed on this node */ hashRemove(batch->sickNodes, job->machine->name); hashRemove(batch->user->sickNodes, job->machine->name); } else { ++crashedJobCount; ++batch->crashCount; ++batch->continuousCrashCount; /* remember the continuous number of times this batch has crashed on this node */ hashIncInt(batch->sickNodes, job->machine->name); updateUserSickNode(batch->user, job->machine->name); } writeResults(batch->name, batch->user->name, job->machine->name, job->id, job->exe, job->submitTime, job->startTime, job->err, job->cmd, status, uTime, sTime); } void resultQueueFree(struct resultQueue **pRq) /* Free up a results queue, closing file if open. */ { struct resultQueue *rq = *pRq; if (rq != NULL) { carefulCloseWarn(&rq->f); freez(pRq); } } void sweepResultsWithRemove(char *name) /* Get rid of result queues that haven't been accessed for * a while. Also remove any matching name if not NULL. * Flushes all results. */ { struct resultQueue *newList = NULL, *rq, *next; for (rq = resultQueues; rq != NULL; rq = next) { next = rq->next; if ((now - rq->lastUsed > 1*MINUTE) || (name && name == rq->name)) { logDebug("hub: closing results file %s", rq->name); resultQueueFree(&rq); } else { slAddHead(&newList, rq); } } slReverse(&newList); resultQueues = newList; flushResults(NULL); } void saveJobId() /* Save job ID. */ { rewind(jobIdFile); writeOne(jobIdFile, nextJobId); fflush(jobIdFile); if (ferror(jobIdFile)) errnoAbort("can't write job id file %s", jobIdFileName); } void openJobId() /* Open file with jobID in it and read jobId. Bump it * by 100000 in case we crashed to avoid reusing job * id's, but do reuse every 2 billion. Let command line * overwrite this though . */ { jobIdFile = fopen(jobIdFileName, "r+"); if (jobIdFile != NULL) { (void)readOne(jobIdFile, nextJobId); nextJobId += 100000; } else jobIdFile = mustOpen(jobIdFileName, "w"); if (nextJobId < 0) nextJobId = 0; nextJobId = optionInt("nextJobId", nextJobId); } void processHeartbeat() /* Check that system is ok. See if we can do anything useful. */ { int spokesToUse; if (needsPlanning) plan(NULL); runner(30); spokesToUse = dlCount(freeSpokes); if (spokesToUse > 0) { spokesToUse >>= 1; spokesToUse -= 1; if (spokesToUse < 1) spokesToUse = 1; graveDigger(spokesToUse); hangman(spokesToUse); sweepResultsWithRemove(NULL); saveJobId(); } } boolean sendKillJobMessage(struct machine *machine, int jobId) /* Send message to compute node to kill job there. */ { char message[64]; safef(message, sizeof(message), "kill %d", jobId); logDebug("hub: %s %s", machine->name, message); if (!sendViaSpoke(machine, message)) { return FALSE; } return TRUE; } void nodeAlive(char *line) /* Deal with message from node that says it's alive. * Move it from dead to free list. The major complication * of this occurs if the node was running a job and it * didn't really go down, we just lost communication with it. * In this case we will have restarted the job elsewhere, and * that other copy could be conflicting with the copy of * the job the node is still running. */ { char *name = nextWord(&line), *jobIdString; int jobId; struct machine *mach; struct dlNode *node; boolean hostFound = FALSE; for (node = deadMachines->head; !dlEnd(node); node = node->next) { mach = node->val; if (sameString(mach->name, name) && mach->isDead) { hostFound = TRUE; dlRemove(node); dlAddTail(freeMachines, node); needsPlanning = TRUE; mach->isDead = FALSE; if (mach->deadJobIds != NULL) { struct dyString *dy = newDyString(0); struct slInt *i = mach->deadJobIds; dyStringPrintf(dy, "hub: node %s assigned ", name); for(i = mach->deadJobIds; i; i = i->next) dyStringPrintf(dy, "%d ", i->val); dyStringPrintf(dy, "came back."); logWarn(dy->string); dyStringFree(&dy); while ((jobIdString = nextWord(&line)) != NULL) { jobId = atoi(jobIdString); if ((i = slIntFind(mach->deadJobIds, jobId))) { struct job *job; warn("hub: Looks like %s is still keeping track of %d", name, jobId); if ((job = findWaitingJob(jobId)) != NULL) { warn("hub: Luckily rerun of job %d has not yet happened.", jobId); job->machine = mach; dlAddTail(mach->jobs, job->jobNode); job->lastChecked = mach->lastChecked = job->lastClockIn = now; dlRemove(job->node); dlAddTail(runningJobs, job->node); dlRemove(mach->node); dlAddTail(busyMachines, mach->node); dlAddTail(hangJobs, job->hangNode); struct batch *batch = job->batch; struct user *user = batch->user; batch->runningCount += 1; batch->queuedCount -= 1; user->runningCount += 1; } else if ((job = jobFind(runningJobs, jobId)) != NULL) { /* Job is running on resurrected machine and another. * Kill it on both since the output it created could * be corrupt at this point. Then add it back to job * queue. */ warn("hub: Job %d is running on %s as well.", jobId, job->machine->name); sendKillJobMessage(mach, job->id); sendKillJobMessage(job->machine, job->id); requeueJob(job); } else { /* This case should be very rare. It should happen when * a node is out of touch for 2 hours, but when it comes * back is running a job that we reran to completion * on another node. */ warn("hub: Job %d has finished running, there is a conflict. " "Data may be corrupted, and it will take a lot of logic to fix.", jobId); } } } } slFreeList(&mach->deadJobIds); runner(1); break; } } if (!hostFound) { warn("hub 'alive $HOST' msg handler: unable to resurrect host %s, " "not find in deadMachines list.", name); } } void recycleMachine(struct machine *mach) /* Recycle machine into free list. */ { dlRemove(mach->node); dlAddTail(readyMachines, mach->node); } void recycleJob(struct job *job) /* Remove job from lists and free up memory associated with it. */ { dlRemove(job->node); jobFree(&job); } void nodeCheckIn(char *line) /* Deal with check in message from node. */ { char *machine = nextWord(&line); char *jobIdString = nextWord(&line); char *status = nextWord(&line); int jobId = atoi(jobIdString); if (status != NULL) { struct job *job = jobFind(runningJobs, jobId); if (job != NULL) { job->lastClockIn = now; if (!sameWord(job->machine->name, machine)) { logError("hub: checkIn %s %s %s should be from %s", machine, jobIdString, status, job->machine->name); } } else { logError("hub: checkIn of unknown job: %s %s %s", machine, jobIdString, status); } if (sameString(status, "free")) { /* Node thinks it's free, we think it has a job. Node * must have missed our job assignment... */ if (job != NULL) { struct machine *mach = job->machine; if (mach != NULL) { dlRemove(mach->node); dlAddTail(readyMachines, mach->node); } requeueJob(job); logDebug("hub: requeueing job in nodeCheckIn"); runner(1); } } } } void recycleSpoke(char *spokeName) /* Try to find spoke and put it back on free list. */ { struct dlNode *node; struct spoke *spoke; boolean foundSpoke = FALSE; for (node = busySpokes->head; !dlEnd(node); node = node->next) { spoke = node->val; if (sameString(spoke->name, spokeName)) { dlRemove(spoke->node); dlAddTail(freeSpokes, spoke->node); foundSpoke = TRUE; break; } } if (!foundSpoke) warn("Couldn't free spoke %s", spokeName); else runner(1); } int addJob(char *userName, char *dir, char *in, char *out, char *results, float cpus, long long ram, char *command) /* Add job to queues. */ { struct job *job; struct user *user; struct batch *batch; job = jobNew(command, userName, dir, in, out, cpus, ram, results, TRUE); if (!job) { return 0; } batch = job->batch; dlAddTail(batch->jobQueue, job->node); ++batch->queuedCount; int oldCpu = batch->cpu; int oldRam = batch->ram; if (job->cpus) batch->cpu = (job->cpus + 0.5) / cpuUnit; /* rounding */ else { /* if no cpus specified, use the default */ batch->cpu = defaultJobCpu; job->cpus = defaultJobCpu * cpuUnit; } if (job->ram) batch->ram = 1 + (job->ram - 1) / ramUnit; /* any remainder will be rounded upwards e.g. 1 to 1024m --> 1G but 1025m --> 2G if unit is 1G. 0m would just cause default ram usage. */ else { /* if no ram size specified, use the default */ batch->ram = defaultJobRam; job->ram = defaultJobRam * ramUnit; } if (oldCpu != batch->cpu || oldRam != batch->ram) { needsPlanning = TRUE; } if (batch->planCount == 0) { needsPlanning = TRUE; } user = batch->user; dlRemove(user->node); dlAddTail(queuedUsers, user->node); job->submitTime = time(NULL); return job->id; } int addJobFromMessage(char *line, int addJobVersion) /* Parse out addJob message and add job to queues. */ { char *userName, *dir, *in, *out, *results, *command; float cpus = 0; long long ram = 0; if ((userName = nextWord(&line)) == NULL) return 0; if ((dir = nextWord(&line)) == NULL) return 0; if ((in = nextWord(&line)) == NULL) return 0; if ((out = nextWord(&line)) == NULL) return 0; if ((results = nextWord(&line)) == NULL) return 0; if (addJobVersion == 2) { char *tempCpus = NULL; char *tempRam = NULL; if ((tempCpus = nextWord(&line)) == NULL) return 0; if ((tempRam = nextWord(&line)) == NULL) return 0; cpus = sqlFloat(tempCpus); ram = sqlLongLong(tempRam); } if (line == NULL || line[0] == 0) return 0; command = line; return addJob(userName, dir, in, out, results, cpus, ram, command); } void addJobAcknowledge(char *line, struct paraMessage *pm, int addJobVersion) /* Add job. Line format is * Returns job ID or 0 if a problem. Send jobId back to client. */ { int id = addJobFromMessage(line, addJobVersion); pmClear(pm); pmPrintf(pm, "%d", id); pmSend(pm, rudpOut); runner(1); } int setMaxJob(char *userName, char *dir, int maxJob) /* Set new maxJob for batch */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; needsPlanning = TRUE; batch->maxJob = maxJob; updateUserMaxJob(user); if (maxJob>=-1) logDebug("paraHub: User %s set maxJob=%d for batch %s", userName, maxJob, dir); return maxJob; } int setMaxJobFromMessage(char *line) /* Parse out setMaxJob message and set new maxJob for batch, update user-maxJob. */ { char *userName, *dir; int maxJob; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; if ((maxJob = atoi(nextWord(&line))) < -1) return -2; return setMaxJob(userName, dir, maxJob); } void setMaxJobAcknowledge(char *line, struct paraMessage *pm) /* Set batch maxJob. Line format is * Returns new maxJob or -2 if a problem. Send new maxJob back to client. */ { int maxJob = setMaxJobFromMessage(line); pmClear(pm); pmPrintf(pm, "%d", maxJob); pmSend(pm, rudpOut); } int resetCounts(char *userName, char *dir) /* Reset done and crashed batch counts */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; batch->doneCount = 0; batch->doneTime = 0; batch->crashCount = 0; logDebug("paraHub: User %s reset done and crashed counts for batch %s", userName, dir); return 0; } int resetCountsFromMessage(char *line) /* Parse out resetCounts message and reset counts for batch. */ { char *userName, *dir; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; return resetCounts(userName, dir); } void resetCountsAcknowledge(char *line, struct paraMessage *pm) /* Resets batch counts for done and crashed. Line format is * Returns new maxJob or -2 if a problem. Send new maxJob back to client. */ { int resetCounts = resetCountsFromMessage(line); pmClear(pm); pmPrintf(pm, "%d",resetCounts); pmSend(pm, rudpOut); } int freeBatch(char *userName, char *batchName) /* Free batch resources, if possible */ { struct user *user = findUser(userName); if (user == NULL) return -3; struct hashEl *hel = hashLookup(stringHash, batchName); if (hel == NULL) return -2; char *name = hel->name; struct batch *batch = findBatchInList(user->curBatches, name); if (batch == NULL) batch = findBatchInList(user->oldBatches, name); if (batch == NULL) return -2; /* make sure nothing running and queue empty */ if (batch->runningCount > 0) return -1; if (!dlEnd(batch->jobQueue->head)) return -1; sweepResultsWithRemove(name); logDebug("paraHub: User %s freed batch %s", userName, batchName); /* remove batch from batchList */ slRemoveEl(&batchList, batch); /* remove from user cur/old batches */ dlRemove(batch->node); /* free batch and its members */ freeMem(batch->node); hashRemove(stringHash, name); freeDlList(&batch->jobQueue); freeHash(&batch->sickNodes); freeMem(batch); return 0; } int freeBatchFromMessage(char *line) /* Parse out freeBatch message and free batch. */ { char *userName, *batchName; if ((userName = nextWord(&line)) == NULL) return -2; if ((batchName = nextWord(&line)) == NULL) return -2; return freeBatch(userName, batchName); } void freeBatchAcknowledge(char *line, struct paraMessage *pm) /* Free batch resources. Line format is * Returns 0 if success or some err # if a problem. Sends result back to client. */ { int result = freeBatchFromMessage(line); pmClear(pm); pmPrintf(pm, "%d",result); pmSend(pm, rudpOut); } int flushResultsByRequest(char *userName, char *batchName) /* Flush results file. Return 0 if nothing running and queue empty. */ { struct user *user = findUser(userName); if (user == NULL) return -3; struct hashEl *hel = hashLookup(stringHash, batchName); if (hel == NULL) return -2; char *name = hel->name; struct batch *batch = findBatchInList(user->curBatches, name); if (batch == NULL) batch = findBatchInList(user->oldBatches, name); if (batch == NULL) return -2; flushResults(batch->name); logDebug("paraHub: User %s flushed results batch %s", userName, batchName); /* return 0 if nothing running and queue empty */ if (batch->runningCount > 0) return -1; if (!dlEnd(batch->jobQueue->head)) return -1; return 0; } int flushResultsFromMessage(char *line) /* Parse out flushResults message and flush the results file. */ { char *userName, *batchName; if ((userName = nextWord(&line)) == NULL) return -2; if ((batchName = nextWord(&line)) == NULL) return -2; return flushResultsByRequest(userName, batchName); } void flushResultsAcknowledge(char *line, struct paraMessage *pm) /* Flush results file. Line format is * Returns 0 if success or some err # if a problem. Sends result back to client. */ { int result = flushResultsFromMessage(line); pmClear(pm); pmPrintf(pm, "%d",result); pmSend(pm, rudpOut); } int clearSickNodes(char *userName, char *dir) /* Clear sick nodes for batch */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; hashFree(&batch->sickNodes); batch->sickNodes = newHashExt(6, FALSE); batch->continuousCrashCount = 0; /* reset so user can retry */ needsPlanning = TRUE; updateUserSickNodes(user); logDebug("paraHub: User %s cleared sick nodes for batch %s", userName, dir); return 0; } int clearSickNodesFromMessage(char *line) /* Parse out clearSickNodes message and call clear nodes. */ { char *userName, *dir; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; return clearSickNodes(userName, dir); } void clearSickNodesAcknowledge(char *line, struct paraMessage *pm) /* Clear sick nodes from batch. Line format is * Returns 0 or -2 if a problem back to client. */ { int result = clearSickNodesFromMessage(line); pmClear(pm); pmPrintf(pm, "%d", result); pmSend(pm, rudpOut); } int showSickNodes(char *userName, char *dir, struct paraMessage *pm) /* Show sick nodes for batch */ { int machineCount = 0, sickCount = 0; struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; logDebug("paraHub: User %s ran showSickNodes for batch %s", userName, dir); struct hashEl *el, *list = hashElListHash(batch->sickNodes); slSort(&list, hashElCmp); for (el = list; el != NULL; el = el->next) { int failures = ptToInt(el->val); if (failures >= sickNodeThreshold) { ++machineCount; sickCount += failures; pmClear(pm); pmPrintf(pm, "%s %d", el->name, ptToInt(el->val)); pmSend(pm, rudpOut); } } hashElFreeList(&list); pmClear(pm); pmPrintf(pm, "total sick machines: %d failures: %d", machineCount, sickCount); pmSend(pm, rudpOut); return 0; } int showSickNodesFromMessage(char *line, struct paraMessage *pm) /* Parse out showSickNodes message and print sick nodes for batch. */ { char *userName, *dir; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; return showSickNodes(userName, dir, pm); } void showSickNodesAcknowledge(char *line, struct paraMessage *pm) /* Show sick nodes from batch. Line format is * Returns just empty line if a problem back to client. */ { showSickNodesFromMessage(line,pm); pmClear(pm); pmSend(pm, rudpOut); } int setPriority(char *userName, char *dir, int priority) /* Set new priority for batch */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return 0; if (batch == NULL) return 0; needsPlanning = TRUE; batch->priority = priority; updateUserPriority(user); if ((priority>=1)&&(priority * Returns new priority or 0 if a problem. Send new priority back to client. */ { int priority = setPriorityFromMessage(line); pmClear(pm); pmPrintf(pm, "%d", priority); pmSend(pm, rudpOut); } void respondToPing(struct paraMessage *pm) /* Someone want's to know we're alive. */ { pmSendString(pm, rudpOut, "ok"); processHeartbeat(); } void finishJob(struct job *job) /* Recycle job memory and the machine it's running on. */ { struct machine *mach = job->machine; struct batch *batch = job->batch; struct user *user = batch->user; if (mach != NULL) { /* see if the node appears to be sick for this batch */ if (hashIntValDefault(batch->sickNodes, mach->name, 0) >= sickNodeThreshold) { /* skip adding back to the mach->plannedBatches list */ needsPlanning = TRUE; } else if (!job->oldPlan) { /* add its batch to end of list so it gets run again on same machine */ struct slRef *el = slRefNew(batch); slAddTail(&mach->plannedBatches, el); } recycleMachine(mach); /* NOTE I moved the following two lines inside the if (mach != NULL) block * because this may fix the problem where we were seeing users get duplicate * jobDone messages or something like that causing users to get * e.g. -200 user->runningCount which then made them hog the whole cluster. */ batch->runningCount -= 1; user->runningCount -= 1; } dlRemove(job->jobNode); dlRemove(job->hangNode); recycleJob(job); } boolean removeRunningJob(struct job *job) /* Remove job - if it's running kill it, remove from job list. */ { if (!sendKillJobMessage(job->machine, job->id)) return FALSE; finishJob(job); return TRUE; } void removePendingJob(struct job *job) /* Remove job from pending queue. */ { struct batch *batch = job->batch; recycleJob(job); unactivateBatchIfEmpty(batch); } boolean removeJobId(int id) /* Remove job of a given id. */ { struct job *job = jobFind(runningJobs, id); if (job != NULL) { logDebug("Removing %s's %s", job->batch->user->name, job->cmd); if (!removeRunningJob(job)) return FALSE; } else { job = findWaitingJob(id); if (job != NULL) { logDebug("Pending job %s", job->cmd); removePendingJob(job); } } return TRUE; } void removeJobAcknowledge(char *names, struct paraMessage *pm) /* Remove job of a given name(s). */ { char *name; char *retVal = "ok"; while ((name = nextWord(&names)) != NULL) { /* It is possible for this remove to fail if we * run out of spokes at the wrong time. Currently * the para client will just report the problem. */ if (!removeJobId(atoi(name))) { retVal = "err"; break; } } pmSendString(pm, rudpOut, retVal); } void chillABatch(struct batch *batch) /* Stop launching jobs from a batch, but don't disturb * running jobs. */ { struct user *user = batch->user; struct dlNode *el, *next; for (el = batch->jobQueue->head; !dlEnd(el); el = next) { struct job *job = el->val; next = el->next; recycleJob(job); /* This free's el too! */ } batch->queuedCount = 0; batch->planCount = 0; dlRemove(batch->node); dlAddTail(user->oldBatches, batch->node); needsPlanning = TRUE; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); } void chillBatch(char *line, struct paraMessage *pm) /* Parse user and batch names from message, * call chillABatch to clear the queue, * and send response ok or err to client. */ { char *userName = nextWord(&line); char *batchName = nextWord(&line); char *res = "err"; if (batchName != NULL) { struct user *user = hashFindVal(userHash, userName); if (user != NULL) { struct batch *batch; batchName = hashStoreName(stringHash, batchName); batch = findBatchInList(user->curBatches, batchName); if (batch != NULL) { chillABatch(batch); } res = "ok"; } } pmSendString(pm, rudpOut, res); } void jobDone(char *line) /* Handle job is done message. */ { struct job *job; char *id = nextWord(&line); char *status = nextWord(&line); char *uTime = nextWord(&line); char *sTime = nextWord(&line); if (sTime != NULL) { job = jobFind(runningJobs, atoi(id)); if (job != NULL) { struct machine *machine = job->machine; if (machine != NULL) { machine->lastChecked = now; if (sameString(status, "0")) machine->goodCount += 1; else machine->errCount += 1; } writeJobResults(job, status, uTime, sTime); struct batch *batch = job->batch; finishJob(job); /* is the batch sick? */ if (batch->continuousCrashCount >= sickBatchThreshold) { chillABatch(batch); } runner(1); } } } void listMachines(struct paraMessage *pm) /* Write list of machines to fd. Format is one machine per message * followed by a blank message. */ { struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { struct dlNode *jobNode = mach->jobs->head; do { /* this list may output multiple rows per machine, one for each running job */ pmClear(pm); pmPrintf(pm, "%-10s good %d, err %d, ", mach->name, mach->goodCount, mach->errCount); if (dlEmpty(mach->jobs)) { if (mach->isDead) pmPrintf(pm, "dead"); else pmPrintf(pm, "idle"); } else { struct job *job = jobNode->val; pmPrintf(pm, "running %-10s %s ", job->batch->user->name, job->cmd); jobNode = jobNode->next; } pmSend(pm, rudpOut); } while (!(dlEmpty(mach->jobs) || dlEnd(jobNode))); } pmSendString(pm, rudpOut, ""); } int countUserActiveBatches(struct user *user) /* Count active batches for user. */ { int count = dlCount(user->curBatches); /* Start with batches with pending jobs. */ struct dlNode *node; /* Add in batches with running but no pending jobs. */ for (node = user->oldBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if (batch->runningCount > 0) ++count; } return count; } void listUsers(struct paraMessage *pm) /* Write list of users to fd. Format is one user per line * followed by a blank line. */ { struct user *user; for (user = userList; user != NULL; user = user->next) { int totalBatch = dlCount(user->curBatches) + dlCount(user->oldBatches); pmClear(pm); pmPrintf(pm, "%s ", user->name); pmPrintf(pm, "%d jobs running, %d waiting, %d finished, %d of %d batches active" ", priority=%d" ", maxJob=%d" , user->runningCount, userQueuedCount(user), user->doneCount, countUserActiveBatches(user), totalBatch, user->priority , user->maxJob ); pmSend(pm, rudpOut); } pmSendString(pm, rudpOut, ""); } void writeOneBatchInfo(struct paraMessage *pm, struct user *user, struct batch *batch) /* Write out info on one batch. */ { char shortBatchName[512]; splitPath(batch->name, shortBatchName, NULL, NULL); pmClear(pm); pmPrintf(pm, "%-8s %4d %6d %6d %5d %3d %3d %3d %4.1fg %4d %3d %s", user->name, batch->runningCount, batch->queuedCount, batch->doneCount, batch->crashCount, batch->priority, batch->maxJob, batch->cpu, ((float)batch->ram*ramUnit)/(1024*1024*1024), batch->planCount, (avgBatchTime(batch)+30)/60, shortBatchName); pmSend(pm, rudpOut); } void listBatches(struct paraMessage *pm) /* Write list of batches. Format is one batch per * line followed by a blank line. */ { struct user *user; pmSendString(pm, rudpOut, "#user run wait done crash pri max cpu ram plan min batch"); for (user = userList; user != NULL; user = user->next) { struct dlNode *bNode; for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next) { writeOneBatchInfo(pm, user, bNode->val); } for (bNode = user->oldBatches->head; !dlEnd(bNode); bNode = bNode->next) { struct batch *batch = bNode->val; if (batch->runningCount > 0) writeOneBatchInfo(pm, user, batch); } } pmSendString(pm, rudpOut, ""); } void appendLocalTime(struct paraMessage *pm, time_t t) /* Append time t converted to day/time format to dy. */ { struct tm *tm; tm = localtime(&t); pmPrintf(pm, "%04d/%02d/%02d %02d:%02d:%02d", 1900+tm->tm_year, 1+tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec); } char *upToFirstDot(char *s, bool dotQ) /* Return string up to first dot. */ { static char ret[128]; int size; char *e = strchr(s, '.'); if (e == NULL) size = strlen(s); else size = e - s; if (size >= sizeof(ret)-2) /* Leave room for .q */ size = sizeof(ret)-3; memcpy(ret, s, size); ret[size] = 0; if (dotQ) strcat(ret, ".q"); return ret; } boolean oneJobList(struct paraMessage *pm, struct dlList *list, boolean sinceStart, boolean extended) /* Write out one job list. Return FALSE if there is a problem. */ { struct dlNode *el; struct job *job; char *machName; for (el = list->head; !dlEnd(el); el = el->next) { job = el->val; if (job->machine != NULL) machName = upToFirstDot(job->machine->name, FALSE); else machName = "none"; pmClear(pm); pmPrintf(pm, "%-4d %-10s %-10s ", job->id, machName, job->batch->user->name); if (sinceStart) appendLocalTime(pm, job->startTime); else appendLocalTime(pm, job->submitTime); pmPrintf(pm, " %s", job->cmd); if (extended) pmPrintf(pm, " %s", job->batch->name); if (!pmSend(pm, rudpOut)) return FALSE; } return TRUE; } void listJobs(struct paraMessage *pm, boolean extended) /* Write list of jobs. Format is one job per message * followed by a blank message. */ { struct user *user; struct dlNode *bNode; struct batch *batch; if (!oneJobList(pm, runningJobs, TRUE, extended)) return; for (user = userList; user != NULL; user = user->next) { for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next) { batch = bNode->val; if (!oneJobList(pm, batch->jobQueue, FALSE, extended)) return; } } pmSendString(pm, rudpOut, ""); } boolean onePstatList(struct paraMessage *pm, struct dlList *list, boolean running, boolean extended, int *resultCount) /* Write out one job list in pstat format. Return FALSE if there is * a problem. */ { struct dlNode *node; struct job *job; time_t t; char *machName; char *state = (running ? "r" : "q"); int count = 0; char buf[rudpMaxSize]; char *terminator = ""; if (extended) terminator = "\n"; pmClear(pm); for (node = list->head; !dlEnd(node); node = node->next) { ++count; job = node->val; if (job->machine != NULL) machName = job->machine->name; else machName = "none"; if (running) t = job->startTime; else t = job->submitTime; if (!running && extended) safef(buf, sizeof(buf), "%s %d\n", state, job->id); else safef(buf, sizeof(buf), "%s %d %s %s %lu %s%s", state, job->id, job->batch->user->name, job->exe, t, machName, terminator); if ((!extended && pm->size > 0) || (pm->size + strlen(buf) > rudpMaxSize)) { if (!pmSend(pm, rudpOut)) { *resultCount += count; return FALSE; } pmClear(pm); } pmPrintf(pm, "%s", buf); } if (pm->size > 0) { if (!pmSend(pm, rudpOut)) { *resultCount += count; return FALSE; } } *resultCount += count; return TRUE; } void pstat(char *line, struct paraMessage *pm, boolean extended) /* Write list of jobs in pstat format. * Extended pstat2 format means we only show queued jobs for the * specific batch, but we still need to return total queue size * and also we can include a status about batch failure. * Older versions of para will not call extended but should still work.*/ { struct user *user; struct dlNode *bNode; struct batch *batch = NULL; char *userName, *dir; struct user *thisUser = NULL; struct batch *thisBatch = NULL; int count = 0; userName = nextWord(&line); dir = nextWord(&line); if (userName) thisUser = findUser(userName); if (dir) thisBatch = findBatch(thisUser, dir, TRUE); if (thisBatch) flushResults(thisBatch->name); if (!onePstatList(pm, runningJobs, TRUE, extended, &count)) return; for (user = userList; user != NULL; user = user->next) { for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next) { batch = bNode->val; if ((thisUser == NULL || thisUser == user) && (thisBatch == NULL || thisBatch == batch)) { if (!onePstatList(pm, batch->jobQueue, FALSE, extended, &count)) return; } else count += batch->queuedCount; } } if (extended) { pmClear(pm); pmPrintf(pm, "Total Jobs: %d", count); if (!pmSend(pm, rudpOut)) return; if (thisBatch && (thisBatch->continuousCrashCount >= sickBatchThreshold)) { pmClear(pm); pmPrintf(pm, "Sick Batch: consecutive crashes (%d) >= sick batch threshold (%d)", thisBatch->continuousCrashCount, sickBatchThreshold); if (!pmSend(pm, rudpOut)) return; } if (thisBatch) { off_t resultsSize = fileSize(thisBatch->name); if (resultsSize != -1) // file exists { pmClear(pm); pmPrintf(pm, "Results Size: %lld", (long long) resultsSize); if (!pmSend(pm, rudpOut)) return; } } } pmSendString(pm, rudpOut, ""); } int sumPendingJobs() /* Return sum of all pending jobs for all users. */ { struct user *user; int count = 0; for (user = userList; user != NULL; user = user->next) count += userQueuedCount(user); return count; } int countActiveUsers() /* Return count of users with jobs running or in queue */ { struct user *user; int count = 0; for (user = userList; user != NULL; user = user->next) { if (userIsActive(user)) ++count; } return count; } int countActiveBatches() /* Return count of active batches */ { int count = 0; struct user *user; for (user = userList; user != NULL; user = user->next) count += countUserActiveBatches(user); return count; } int getCpus(struct dlList *list) /* Return total CPU resources in list. */ { int count = 0; struct dlNode *node = NULL; for (node = list->head; !dlEnd(node); node = node->next) { struct machine *mach = node->val; count += mach->machSpec->cpus; } return count; } int getBusyCpus(struct dlList *list) /* Return total CPU resources in list. */ { int count = 0; struct dlNode *node = NULL; for (node = list->head; !dlEnd(node); node = node->next) { struct machine *machine = node->val; /* all the cpus now in-use */ struct dlNode *jobNode = NULL; for (jobNode = machine->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next) { struct job *job = jobNode->val; struct batch * batch =job->batch; count += batch->cpu; } } return count; } void status(struct paraMessage *pm) /* Write summary status. Format is one line per message * followed by a blank message. */ { char buf[256]; int totalCpus = getCpus(freeMachines)+getCpus(readyMachines)+getCpus(blockedMachines)+getCpus(busyMachines); int busyCpus = getBusyCpus(readyMachines)+getBusyCpus(blockedMachines)+getBusyCpus(busyMachines); safef(buf, sizeof(buf), "CPUs total: %d", totalCpus); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "CPUs free: %d", totalCpus - busyCpus); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "CPUs busy: %d", busyCpus); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Nodes total: %d", dlCount(freeMachines)+dlCount(busyMachines)+dlCount(readyMachines)+ dlCount(blockedMachines)+dlCount(deadMachines)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Nodes dead: %d", dlCount(deadMachines)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Nodes sick?: %d", listSickNodes(NULL)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs running: %d", dlCount(runningJobs)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs waiting: %d", sumPendingJobs()); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs finished: %d", finishedJobCount); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs crashed: %d", crashedJobCount); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Spokes free: %d", dlCount(freeSpokes)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Spokes busy: %d", dlCount(busySpokes)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Spokes dead: %d", dlCount(deadSpokes)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Active batches: %d", countActiveBatches()); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Total batches: %d", slCount(batchList)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Active users: %d", countActiveUsers()); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Total users: %d", slCount(userList)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Days up: %f", (now - startupTime)/(3600.0 * 24.0)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Version: %s", version); pmSendString(pm, rudpOut, buf); pmSendString(pm, rudpOut, ""); } void addSpoke() /* Start up a new spoke and add it to free list. */ { struct spoke *spoke; spoke = spokeNew(); if (spoke != NULL) { slAddHead(&spokeList, spoke); dlAddTail(freeSpokes, spoke->node); } } void killSpokes() /* Kill all spokes. */ { struct spoke *spoke, *next; for (spoke = spokeList; spoke != NULL; spoke = next) { next = spoke->next; dlRemove(spoke->node); spokeFree(&spoke); } } void startSpokes() /* Start default number of spokes. */ { int i; for (i=0; iname); if (hashLookup(machineHash, ms->name)) errAbort("machine list contains duplicate: %s", ms->name); struct machine *machine = doAddMachine(ms->name, ms->tempDir, ip, ms); hashStoreName(machineHash, ms->name); // TODO Add a command-line param for these that overrides default? /* use first machine in spec list as model node */ if (firstTime) { firstTime = FALSE; cpuUnit = 1; /* 1 CPU */ if (!optionExists("ramUnit")) ramUnit = ((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus; defaultJobCpu = 1; /* number of cpuUnits in default job usage */ /* number of ramUnits in default job usage, resolves to just 1 currently */ if (!optionExists("defaultJobRam")) defaultJobRam = (((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus) / ramUnit; } int c = 0, r = 0; readTotalMachineResources(machine, &c, &r); maxCpuInCluster = max(maxCpuInCluster, c); maxRamInCluster = max(maxRamInCluster, r); } lineFileClose(&lf); } struct existingResults /* Keep track of old results we need to integrate into */ { struct existingResults *next; char *fileName; /* Name of file this is in, not allocated here */ struct hash *hash; /* Hash keyed by ascii jobId indicated job results * already recorded. */ }; void existingResultsFree(struct existingResults **pEr) /* Free up existing results structure */ { struct existingResults *er = *pEr; if (er != NULL) { freeHash(&er->hash); freez(pEr); } } void existingResultsFreeList(struct existingResults **pList) /* Free list of existingResults */ { struct existingResults *el, *next; for (el = *pList; el != NULL; el = next) { next = el->next; existingResultsFree(&el); } *pList = NULL; } void readResults(char *fileName, struct hash *hash) /* Read jobId's of results into hash */ { struct lineFile *lf = lineFileMayOpen(fileName, TRUE); char *row[3]; char *line; int wordCount; if (lf == NULL) { warn("Couldn't open results file %s", fileName); return; } while (lineFileNext(lf, &line, NULL)) { wordCount = chopLine(line, row); if (wordCount == 0 || row[0][0] == '#') continue; if (wordCount < 3) { warn("Short line %d of %s", lf->lineIx, lf->fileName); continue; } if (!isdigit(row[2][0])) { warn("Expecting number field 3 line %d of %s", lf->lineIx, lf->fileName); break; } hashAdd(hash, row[2], NULL); } lineFileClose(&lf); } struct existingResults *getExistingResults(char *fileName, struct hash *erHash, struct existingResults **pErList) /* Get results from hash if we've seen them before, otherwise * read them in, save in hash, and return them. */ { struct existingResults *er = hashFindVal(erHash, fileName); if (er == NULL) { AllocVar(er); slAddHead(pErList, er); hashAddSaveName(erHash, fileName, er, &er->fileName); er->hash = newHashExt(18, FALSE); readResults(fileName, er->hash); } return er; } void addRunningJob(struct runJobMessage *rjm, char *resultFile, struct machine *mach) /* Add job that is already running to queues. */ { if (dlCount(mach->jobs) > mach->machSpec->cpus) warn("%s seems to have more jobs running than it has cpus", mach->name); else { struct job *job = jobNew(rjm->command, rjm->user, rjm->dir, rjm->in, rjm->out, rjm->cpus, rjm->ram, resultFile, FALSE); if (!job) return; struct batch *batch = job->batch; struct user *user = batch->user; job->id = atoi(rjm->jobIdString); ++batch->runningCount; ++user->runningCount; dlRemove(batch->node); dlAddTail(user->oldBatches, batch->node); dlAddTail(mach->jobs, job->jobNode); job->machine = mach; dlAddTail(runningJobs, job->node); dlRemove(mach->node); dlAddTail(readyMachines, mach->node); dlAddTail(hangJobs, job->hangNode); mach->lastChecked = job->lastChecked = job->submitTime = job->startTime = job->lastClockIn = now; } } void pljErr(struct machine *mach, int no) /* Print out error message in the middle of routine below. */ { warn("%s: truncated listJobs response %d", mach->name, no); } void getExeOnly(char *command, char exe[256]) /* Extract executable file (not including path) from command line. */ { /* Extract name of executable file with no path. */ char *dupeCommand = cloneString(command); char *exePath = firstWordInLine(dupeCommand); char exeFile[128], exeExt[64]; splitPath(exePath, NULL, exeFile, exeExt); /* We cannot use sizeof(exe) because an array on a stack * is just a pointer, and so pointer-size is all that sizeof returns * for exe. */ safef(exe, 256, "%s%s", exeFile, exeExt); freez(&dupeCommand); } void writeExistingResults(char *fileName, char *line, struct machine *mach, struct runJobMessage *rjm) { char err[512], exe[256]; int jobId = atoi(rjm->jobIdString); char *status = nextWord(&line); char *uTime = nextWord(&line); char *sTime = nextWord(&line); if (sTime == NULL) { warn("Bad line format in writeExistingResults for %s", mach->name); return; } getExeOnly(rjm->command, exe); fillInErrFile(err, jobId, mach->tempDir); fileName = hashStoreName(stringHash, fileName); writeResults(fileName, rjm->user, mach->name, jobId, exe, now, now, err, rjm->command, status, uTime, sTime); } boolean processListJobs(struct machine *mach, struct paraMessage *pm, struct rudp *ru, struct hash *erHash, struct existingResults **pErList, int *pRunning, int *pFinished) /* Process response to list jobs message. Read jobs node is running and * has recently finished. Add running ones to job list. Add finished * ones to results file if necessary. * * Format of message is * running count * one line for each running job. * recent count * two lines for each recent job. */ { int running, recent, i, finCount = 0; struct runJobMessage rjm; char resultsFile[512]; struct paraMultiMessage pmm; /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/ pmmInit(&pmm, pm, pm->ipAddress.sin_addr); if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { warn("%s: no listJobs response", mach->name); return FALSE; } running = atoi(pm->data); for (i=0; idata, &rjm)) { pljErr(mach, 2); return FALSE; } snprintf(resultsFile, sizeof(resultsFile), "%s/%s", rjm.dir, "para.results"); addRunningJob(&rjm, resultsFile, mach); } *pRunning += running; if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { pljErr(mach, 3); return FALSE; } recent = atoi(pm->data); for (i=0; idata);; if (!parseRunJobMessage(startLine, &rjm)) { pljErr(mach, 5); freez(&startLine); return FALSE; } if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { pljErr(mach, 6); freez(&startLine); return FALSE; } /* Do not duplicate a result. Check if it already is in para.results */ safef(resultsFile, sizeof(resultsFile), "%s/%s", rjm.dir, "para.results"); er = getExistingResults(resultsFile, erHash, pErList); if (!hashLookup(er->hash, rjm.jobIdString)) { writeExistingResults(resultsFile, pm->data, mach, &rjm); ++finCount; } freez(&startLine); } *pFinished += finCount; return TRUE; } void checkForJobsOnNodes() /* Poll nodes and see if they have any jobs for us. */ { struct machine *mach; int running = 0, finished = 0; struct hash *erHash = newHashExt(8, FALSE); /* A hash of existingResults */ struct existingResults *erList = NULL; logDebug("Checking for jobs already running on nodes"); for (mach = machineList; mach != NULL; mach = mach->next) { struct paraMessage pm; struct rudp *ru = rudpNew(rudpOut->socket); /* Get own resend timing */ logDebug("check for jobs on %s", mach->name); pmInitFromName(&pm, mach->name, paraNodePort); if (!pmSendString(&pm, ru, "listJobs")) { machineDown(mach); continue; } if (!processListJobs(mach, &pm, rudpOut, erHash, &erList, &running, &finished)) machineDown(mach); rudpFree(&ru); } /* Clean up time. */ existingResultsFreeList(&erList); hashFree(&erHash); needsPlanning = TRUE; /* Report results. */ logDebug("%d running jobs, %d jobs that finished while hub was down", running, finished); } void startHub(char *machineList) /* Do hub daemon - set up socket, and loop around on it until we get a quit. */ { struct sockaddr_in sai; char *line, *command; struct rudp *rudpIn = NULL; /* Note startup time. */ findNow(); startupTime = now; /* Find name and IP address of our machine. */ hubHost = getMachine(); if (optionExists("log")) logOpenFile("paraHub", optionVal("log", NULL)); else logOpenSyslog("paraHub", optionVal("logFacility", NULL)); logSetMinPriority(optionVal("logMinPriority", "info")); logInfo("starting paraHub on %s", hubHost); /* Set up various lists. */ hubMessageQueueInit(); stringHash = newHash(0); setupLists(); machineHash = newHash(0); startMachines(machineList); openJobId(); logInfo("next job ID is %d.", nextJobId); rudpOut = rudpMustOpen(); if (!optionExists("noResume")) checkForJobsOnNodes(); /* Initialize socket etc. */ ZeroVar(&sai); sai.sin_family = AF_INET; sai.sin_port = htons(paraHubPort); sai.sin_addr.s_addr = INADDR_ANY; rudpIn = rudpMustOpenBound(&sai); /* Start up daemons. */ sockSuckStart(rudpIn); startHeartbeat(); startSpokes(); logDebug("sockSuck,Heartbeat,Spokes have been started"); /* Bump up our priority to just shy of real-time. */ nice(-40); /* Main event loop. */ for (;;) { struct paraMessage *pm = hubMessageGet(); findNow(); line = pm->data; logDebug("hub: %s", line); command = nextWord(&line); if (sameWord(command, "jobDone")) jobDone(line); else if (sameWord(command, "recycleSpoke")) recycleSpoke(line); else if (sameWord(command, "heartbeat")) processHeartbeat(); else if (sameWord(command, "setPriority")) setPriorityAcknowledge(line, pm); else if (sameWord(command, "setMaxJob")) setMaxJobAcknowledge(line, pm); else if (sameWord(command, "resetCounts")) resetCountsAcknowledge(line, pm); else if (sameWord(command, "freeBatch")) freeBatchAcknowledge(line, pm); else if (sameWord(command, "flushResults")) flushResultsAcknowledge(line, pm); else if (sameWord(command, "showSickNodes")) showSickNodesAcknowledge(line, pm); else if (sameWord(command, "clearSickNodes")) clearSickNodesAcknowledge(line, pm); else if (sameWord(command, "addJob")) addJobAcknowledge(line, pm, 1); else if (sameWord(command, "addJob2")) addJobAcknowledge(line, pm, 2); else if (sameWord(command, "nodeDown")) nodeDown(line); else if (sameWord(command, "alive")) nodeAlive(line); else if (sameWord(command, "checkIn")) nodeCheckIn(line); else if (sameWord(command, "checkDeadNodesASAP")) checkDeadNodesASAP(); else if (sameWord(command, "removeJob")) removeJobAcknowledge(line, pm); else if (sameWord(command, "chill")) chillBatch(line, pm); else if (sameWord(command, "ping")) respondToPing(pm); else if (sameWord(command, "addMachine")) addMachine(line); else if (sameWord(command, "removeMachine")) removeMachineAcknowledge(line, pm); else if (sameWord(command, "listJobs")) listJobs(pm, FALSE); else if (sameWord(command, "listJobsExtended")) listJobs(pm, TRUE); else if (sameWord(command, "listMachines")) listMachines(pm); else if (sameWord(command, "listUsers")) listUsers(pm); else if (sameWord(command, "listBatches")) listBatches(pm); else if (sameWord(command, "listSick")) listSickNodes(pm); else if (sameWord(command, "status")) status(pm); else if (sameWord(command, "pstat")) pstat(line, pm, FALSE); else if (sameWord(command, "pstat2")) pstat(line, pm, TRUE); else if (sameWord(command, "addSpoke")) addSpoke(); else if (sameWord(command, "plan")) plan(pm); if (sameWord(command, "quit")) break; pmFree(&pm); } endHeartbeat(); killSpokes(); saveJobId(); #ifdef SOON #endif /* SOON */ } void fillInSubnet() /* Parse subnet paramenter if any into subnet variable. */ { char *sns = optionVal("subnet", NULL); if (sns == NULL) sns = optionVal("subNet", NULL); netParseSubnet(sns, hubSubnet); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, optionSpecs); if (argc < 2) usage(); if (optionExists("ramUnit")) { ramUnit = paraParseRam(optionVal("ramUnit", "")); if (ramUnit == -1) errAbort("Invalid RAM expression '%s' in '-ramUnit=' option", optionVal("ramUnit", "")); } if (optionExists("defaultJobRam")) { defaultJobRam = optionInt("defaultJobRam", defaultJobRam); if (defaultJobRam < 1) errAbort("Invalid defaultJobRam specified in option -defaultJobRam=%d", defaultJobRam); } jobCheckPeriod = optionInt("jobCheckPeriod", jobCheckPeriod); machineCheckPeriod = optionInt("machineCheckPeriod", machineCheckPeriod); initialSpokes = optionInt("spokes", initialSpokes); fillInSubnet(); paraDaemonize("paraHub"); startHub(argv[1]); return 0; }