/* pipeline.c - create a process pipeline that can be used for reading or * writing */ #include "pipeline.h" #include "common.h" #include "sqlNum.h" #include "dystring.h" #include "errabort.h" #include "portable.h" #include "linefile.h" #include #include #include #include enum procState /* process state, in order of transition */ { procStateNew, // plProc object created procStateRun, // proccess running procStateDone // process finished (ok or failed) }; struct plProc /* A single process in a pipeline */ { struct plProc *next; /* order list of processes */ struct pipeline *pl; /* pipeline we are associated with */ char **cmd; /* null-terminated command for this process */ pid_t pid; /* pid for process, -1 if not running */ enum procState state; /* state of process */ int status; /* status from wait */ }; struct pipeline /* Object for a process pipeline and associated open file. Pipeline process * consist of a process group leader and then all of the child process. The * group leader does no work, just wait on processes to complete and report * errors to the top level process. This object is create in the calling * process, and then passed down, but not shared, via forks. */ { struct plProc *procs; /* list of processes */ int numRunning; /* number of processes running */ pid_t groupLeader; /* process group id, or -1 if not set. This is pid of group leader */ char *procName; /* name to use in error messages. */ int pipeFd; /* fd of pipe to/from process, -1 if none */ unsigned options; /* options */ FILE* pipeFh; /* optional stdio around pipe */ char* stdioBuf; /* optional stdio buffer */ struct lineFile *pipeLf; /* optional lineFile around pipe */ }; /* file buffer size */ #define FILE_BUF_SIZE 64*1024 static int pipeCreate(int *writeFd) /* create a pipe or die, return readFd */ { int pipeFds[2]; if (pipe(pipeFds) < 0) errnoAbort("can't create pipe"); *writeFd = pipeFds[1]; return pipeFds[0]; } static void safeClose(int *fdPtr) /* Close with error checking. *fdPtr == -1 indicated already closed */ { int fd = *fdPtr; if (fd != -1) { if (close(fd) < 0) errnoAbort("close failed on fd %d", fd); *fdPtr = -1; } } static void closeNonStdDescriptors(void) /* close non-standard file descriptors */ { long maxFd = sysconf(_SC_OPEN_MAX); if (maxFd < 0) maxFd = 4096; // shouldn't really happen int fd; for (fd = STDERR_FILENO+1; fd < maxFd; fd++) close(fd); } static char* joinCmd(char **cmd) /* join an cmd vector into a space separated string */ { struct dyString *str = dyStringNew(512); int i; for (i = 0; cmd[i] != NULL; i++) { if (i > 0) dyStringAppend(str, " "); dyStringAppend(str, cmd[i]); } return dyStringCannibalize(&str); } static char* joinCmds(char ***cmds) /* join an cmds vetor into a space and pipe separated string */ { struct dyString *str = dyStringNew(512); int i, j; for (i = 0; cmds[i] != NULL; i++) { if (i > 0) dyStringAppend(str, " | "); for (j = 0; cmds[i][j] != NULL; j++) { if (j > 0) dyStringAppend(str, " "); dyStringAppend(str, cmds[i][j]); } } return dyStringCannibalize(&str); } static char** cloneCmdVector(char **cmd) /* make a copy of the vector */ { int i, cmdLen = 0; for (i = 0; cmd[i] != NULL; i++) cmdLen++; char **cmd2 = needMem((cmdLen+1)*sizeof(char*)); for (i = 0; i < cmdLen; i++) cmd2[i] = cloneString(cmd[i]); cmd2[cmdLen] = NULL; return cmd2; } static struct plProc* plProcNew(char **cmd, struct pipeline *pl) /* create a new plProc object for a command. */ { struct plProc* proc; AllocVar(proc); proc->pl = pl; proc->cmd = cloneCmdVector(cmd); proc->state = procStateNew; return proc; } static void plProcFree(struct plProc *proc) /* free a plProc object. */ { int i; for (i = 0; proc->cmd[i] != NULL; i++) freeMem(proc->cmd[i]); freeMem(proc->cmd); freeMem(proc); } static void plProcStateTrans(struct plProc *proc, enum procState newState) /* do state transition for process changing it to a new state */ { // States must transition in order. New state must immediately follow the // current state. if (newState != proc->state+1) errAbort("invalid state transition: %d -> %d", proc->state, newState); proc->state = newState; } static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd) /* setup signal, error handling, and file descriptors after fork */ { /* treat a closed pipe as an EOF rather than getting SIGPIPE */ struct sigaction sigAct; ZeroVar(&sigAct); sigAct.sa_handler = SIG_IGN; if (sigaction(SIGPIPE, &sigAct, NULL) != 0) errnoAbort("failed to set SIGPIPE to SIG_IGN"); /* child, first setup stdio files */ if (stdinFd != STDIN_FILENO) { if (dup2(stdinFd, STDIN_FILENO) < 0) errnoAbort("can't dup2 to stdin"); } if (stdoutFd != STDOUT_FILENO) { if (dup2(stdoutFd, STDOUT_FILENO) < 0) errnoAbort("can't dup2 to stdout"); } if (stderrFd != STDERR_FILENO) { if (dup2(stderrFd, STDERR_FILENO) < 0) errnoAbort("can't dup2 to stderr"); } closeNonStdDescriptors(); } static void plProcExecChild(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd) /* child part of process startup. */ { plProcSetup(proc, stdinFd, stdoutFd, stderrFd); execvp(proc->cmd[0], proc->cmd); errnoAbort("exec failed: %s", proc->cmd[0]); } static void plProcMemWrite(struct plProc* proc, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* implements child process to write memory buffer to pipeline after * fork */ { plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd); ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize); if (wrCnt < 0) errnoAbort("pipeline input buffer write failed"); else if (wrCnt != otherEndBufSize) errAbort("pipeline input buffer short write %lld, expected %lld", (long long)wrCnt, (long long)otherEndBufSize); else { close(STDOUT_FILENO); exit(0); } } static void plProcHandleTerminate(struct plProc* proc, int status) /* handle one of the processes terminating, save exit status */ { proc->status = status; if (WIFSIGNALED(proc->status)) errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"", WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName); assert(WIFEXITED(proc->status)); if (WEXITSTATUS(proc->status) != 0) { // only print an error message if aborting if (!(proc->pl->options & pipelineNoAbort)) fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n", WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName); exit(WEXITSTATUS(proc->status)); // pass back exit code } proc->pid = -1; plProcStateTrans(proc, procStateDone); } static struct pipeline* pipelineNew(char ***cmds, unsigned options) /* create a new pipeline object. Doesn't start processes */ { static char *memPseudoCmd[] = {"[mem]", NULL}; struct pipeline *pl; int iCmd; AllocVar(pl); pl->groupLeader = -1; pl->pipeFd = -1; pl->options = options; pl->procName = joinCmds(cmds); if (cmds[0] == NULL) errAbort("no commands in pipeline"); if (options & pipelineMemInput) { /* add proc for forked process to write memory to pipeline */ slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl)); } for(iCmd = 0; cmds[iCmd] != NULL; iCmd++) slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl)); return pl; } void pipelineFree(struct pipeline **plPtr) /* free a pipeline object */ { struct pipeline *pl = *plPtr; if (pl != NULL) { struct plProc *proc = pl->procs; while (proc != NULL) { struct plProc *delProc = proc; proc = proc->next; plProcFree(delProc); } freez(&pl->procName); freez(&pl->stdioBuf); freez(plPtr); } } static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid) /* find a plProc by pid */ { struct plProc *proc; for (proc = pl->procs; proc != NULL; proc = proc->next) if (proc->pid == pid) return proc; errAbort("pid not found in pipeline: %d", (int)pid); return 0; // never reached } static void execProcChild(struct pipeline* pl, struct plProc *proc, int procStdinFd, int procStdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* handle child process setup after fork. This does not return */ { if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) errnoAbort("error ignoring SIGPIPE"); if (setpgid(getpid(), pl->groupLeader) != 0) errnoAbort("error from setpgid(%d, %d)", getpid(), pl->groupLeader); if (otherEndBuf != NULL) plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); else plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd); } static int pipelineExecProc(struct pipeline* pl, struct plProc *proc, int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* start a process in the pipeline, return the stdout fd of the process */ { /* determine stdin/stdout to use */ int procStdinFd, procStdoutFd; if (proc == pl->procs) procStdinFd = stdinFd; /* first process in pipeline */ else procStdinFd = prevStdoutFd; if (proc->next == NULL) procStdoutFd = stdoutFd; /* last process in pipeline */ else prevStdoutFd = pipeCreate(&procStdoutFd); /* start process */ if ((proc->pid = fork()) < 0) errnoAbort("can't fork"); if (proc->pid == 0) execProcChild(pl, proc, procStdinFd, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); /* record that we did this */ plProcStateTrans(proc, procStateRun); pl->numRunning++; /* don't leave intermediate pipes open in parent */ if (proc != pl->procs) safeClose(&procStdinFd); if (proc->next != NULL) safeClose(&procStdoutFd); return prevStdoutFd; } static void pipelineGroupExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of * the pipeline, stderrFd is applied to all processed */ { struct plProc *proc; int prevStdoutFd = -1; for (proc = pl->procs; proc != NULL; proc = proc->next) { prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); otherEndBuf = NULL; /* only for first process (read pipes) */ otherEndBufSize = 0; } } static void waitOnOne(struct pipeline *pl) /* wait on one process to finish */ { int status; pid_t pid = waitpid(-pl->groupLeader, &status, 0); if (pid < 0) errnoAbort("waitpid failed"); plProcHandleTerminate(pipelineFindProc(pl, pid), status); pl->numRunning--; assert(pl->numRunning >= 0); } static void groupWait(struct pipeline *pl) /* Wait for pipeline to complete */ { /* wait on all processes to complete */ while (pl->numRunning > 0) waitOnOne(pl); } static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* group leader process */ { pl->groupLeader = getpid(); if (setpgid(pl->groupLeader, pl->groupLeader) != 0) errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); // only keep stderr open close(STDIN_FILENO); close(STDOUT_FILENO); closeNonStdDescriptors(); groupWait(pl); exit(0); } static int groupLeaderWait(struct pipeline *pl) /* Wait for group leader to complete. If pipelineNoAbort was specified, return * the exit code of the first group process exit non-zero, or zero if none * failed. */ { int status; pid_t pid = waitpid(-pl->groupLeader, &status, 0); if (pid < 0) errnoAbort("waitpid failed"); if (WIFSIGNALED(status)) errAbort("process pipeline terminated on signal %d", WTERMSIG(status)); assert(WIFEXITED(status)); if ((WEXITSTATUS(status) != 0) && !(pl->options & pipelineNoAbort)) errAbort("pipeline exited with %d", WEXITSTATUS(status)); return WEXITSTATUS(status); } static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* Fork the group leader, which then launches all all processes in a pipeline, * stdinFd and stdoutFd are the ends of the pipeline, stderrFd is applied to * all processes, including group leader */ { assert(pl->groupLeader < 0); // should not be set if ((pl->groupLeader = fork()) < 0) errnoAbort("can't fork"); if (pl->groupLeader == 0) { groupLeaderRun(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); exit(1); // doesn't return to here } else { // parent also must also setpgid to prevent race condition if (setpgid(pl->groupLeader, pl->groupLeader) != 0) errnoAbort("error from parent setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); } } static int openRead(char *fname) /* open a file for reading */ { int fd = open(fname, O_RDONLY); if (fd < 0) errnoAbort("can't open for read access: %s", fname); return fd; } static int openWrite(char *fname, boolean append) /* open a file for write access */ { int flags = O_WRONLY|O_CREAT; if (append) flags |= O_APPEND; else flags |= O_TRUNC; int fd = open(fname, flags, 0666); if (fd < 0) errnoAbort("can't open for write access: %s", fname); return fd; } static void pipelineStartRead(struct pipeline *pl, int stdinFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* start a read pipeline */ { int pipeWrFd; pl->pipeFd = pipeCreate(&pipeWrFd); pipelineExec(pl, stdinFd, pipeWrFd, stderrFd, otherEndBuf, otherEndBufSize); safeClose(&pipeWrFd); } static void pipelineStartWrite(struct pipeline *pl, int stdoutFd, int stderrFd) /* start a write pipeline */ { int pipeRdFd = pipeCreate(&pl->pipeFd); pipelineExec(pl, pipeRdFd, stdoutFd, stderrFd, NULL, 0); safeClose(&pipeRdFd); } static void checkOpts(unsigned opts) /* check option set for consistency */ { if (((opts & (pipelineRead|pipelineWrite)) == 0) || ((opts & (pipelineRead|pipelineWrite)) == (pipelineRead|pipelineWrite))) errAbort("must specify one of pipelineRead or pipelineWrite to pipelineOpen"); if ((opts & pipelineAppend) && ((opts & pipelineWrite) == 0)) errAbort("pipelineAppend is valid only in conjunction with pipelineWrite"); } struct pipeline *pipelineOpenFd(char ***cmds, unsigned opts, int otherEndFd, int stderrFd) /* Create a pipeline from an array of commands. See pipeline.h for * full documentation. */ { struct pipeline *pl; checkOpts(opts); pl = pipelineNew(cmds, opts); if (opts & pipelineRead) pipelineStartRead(pl, otherEndFd, stderrFd, NULL, 0); else pipelineStartWrite(pl, otherEndFd, stderrFd); return pl; } struct pipeline *pipelineOpen(char ***cmds, unsigned opts, char *otherEndFile, char *stderrFile) /* Create a pipeline from an array of commands. See pipeline.h for * full documentation */ { int otherEndFd; int stderrFd = (stderrFile == NULL) ? STDERR_FILENO : openWrite(stderrFile, FALSE); checkOpts(opts); boolean append = ((opts & pipelineAppend) != 0); if (opts & pipelineRead) otherEndFd = (otherEndFile == NULL) ? STDIN_FILENO : openRead(otherEndFile); else otherEndFd = (otherEndFile == NULL) ? STDOUT_FILENO : openWrite(otherEndFile, append); struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd); safeClose(&otherEndFd); if (stderrFile != NULL) safeClose(&stderrFd); return pl; } struct pipeline *pipelineOpenMem(char ***cmds, unsigned opts, void *otherEndBuf, size_t otherEndBufSize, int stderrFd) /* Create a pipeline from an array of commands, with the pipeline input/output * in a memory buffer. See pipeline.h for full documentation. Currently only * input to a read pipeline is supported */ { struct pipeline *pl; checkOpts(opts); if (opts & pipelineWrite) errAbort("pipelineOpenMem only supports read pipelines at this time"); opts |= pipelineMemInput; pl = pipelineNew(cmds, opts); pipelineStartRead(pl, STDIN_FILENO, stderrFd, otherEndBuf, otherEndBufSize); return pl; } struct pipeline *pipelineOpenFd1(char **cmd, unsigned opts, int otherEndFd, int stderrFd) /* like pipelineOpenFd(), only takes a single command */ { char **cmds[2]; cmds[0] = cmd; cmds[1] = NULL; return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd); } struct pipeline *pipelineOpen1(char **cmd, unsigned opts, char *otherEndFile, char *stderrFile) /* like pipelineOpen(), only takes a single command */ { char **cmds[2]; cmds[0] = cmd; cmds[1] = NULL; return pipelineOpen(cmds, opts, otherEndFile, stderrFile); } struct pipeline *pipelineOpenMem1(char **cmd, unsigned opts, void *otherEndBuf, size_t otherEndBufSize, int stderrFd) /* like pipelineOpenMem(), only takes a single command */ { char **cmds[2]; cmds[0] = cmd; cmds[1] = NULL; return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd); } char *pipelineDesc(struct pipeline *pl) /* Get the description of a pipeline for use in error messages */ { return pl->procName; } int pipelineFd(struct pipeline *pl) /* Get the file descriptor for a pipeline */ { return pl->pipeFd; } FILE *pipelineFile(struct pipeline *pl) /* Get a FILE object wrapped around the pipeline. Do not close the FILE, is * owned by the pipeline object. A FILE is created on first call to this * function. Subsequent calls return the same FILE.*/ { if (pl->pipeFh == NULL) { /* create FILE* on first access */ char *mode = (pl->options & pipelineRead) ? "r" : "w"; if (pl->pipeLf != NULL) errAbort("can't call pipelineFile after having associated a lineFile with a pipeline"); pl->pipeFh = fdopen(pl->pipeFd, mode); if (pl->pipeFh == NULL) errnoAbort("fdopen failed for: %s", pl->procName); pl->stdioBuf = needLargeMem(FILE_BUF_SIZE); setvbuf(pl->pipeFh, pl->stdioBuf, _IOFBF, FILE_BUF_SIZE); } return pl->pipeFh; } struct lineFile *pipelineLineFile(struct pipeline *pl) /* Get a lineFile object wrapped around the pipeline. Do not close the * lineFile, is owned by the pipeline object. A lineFile is created on first * call to this function. Subsequent calls return the same object.*/ { if (pl->pipeLf == NULL) { /* create line on first acess */ if (pl->pipeFh != NULL) errAbort("can't call pipelineLineFile after having associated a FILE with a pipeline"); if (pl->options & pipelineWrite) errAbort("can't associated a lineFile with a write pipeline"); pl->pipeLf = lineFileAttach(pipelineDesc(pl), TRUE, pl->pipeFd); } return pl->pipeLf; } static void closePipelineFile(struct pipeline *pl) /* close a pipeline with a FILE associated with it */ { if (pl->options & pipelineWrite) { fflush(pl->pipeFh); if (ferror(pl->pipeFh)) errAbort("write failed to pipeline: %s ", pl->procName); } else if (ferror(pl->pipeFh)) errAbort("read failed from pipeline: %s ", pl->procName); if (fclose(pl->pipeFh) == EOF) errAbort("close failed on pipeline: %s ", pl->procName); pl->pipeFh = NULL; } static void closePipeline(struct pipeline *pl) /* Close the pipe file */ { if (pl->pipeFh != NULL) closePipelineFile(pl); else if (pl->pipeLf != NULL) lineFileClose(&pl->pipeLf); else { if (close(pl->pipeFd) < 0) errAbort("close failed on pipeline: %s ", pl->procName); } pl->pipeFd = -1; } int pipelineWait(struct pipeline *pl) /* Wait for processes in a pipeline to complete; normally aborts if any * process exists non-zero. If pipelineNoAbort was specified, return the exit * code of the first process exit non-zero, or zero if none failed. */ { /* must close before waiting to so processes get pipe EOF */ closePipeline(pl); return groupLeaderWait(pl); } void pipelineDumpCmds(char ***cmds) /* Dump out pipeline-formatted commands to stdout for debugging. */ { char **cmd; boolean first = TRUE; while ((cmd = *cmds++) != NULL) { char *word; if (first) first = FALSE; else printf("| "); while ((word = *cmd++) != NULL) printf("%s ", word); } printf("
\n"); } /* * Local Variables: * c-file-style: "jkent-c" * End: */