/****************************************************************************** * * SSI: A Simple Socket Interface * * These routines perform some simple communucations operations between * processes over TCP/IP. * * LIMITATIONS * * o One process per UNIX host * o Functions are either collective or master/slave (not point to point) * o Message-passing code probably only works for short messages * (should be fixed) * o Length of STRLEN on hostnames and system command (redefinable) * o Maximum 16 nodes (redefinable) * o Very little error checking is performed * o Host names must be provided exactly as returned by gethostname() * * ENVIRONMENT VARIABLES * * SSI_HOSTS : a list of hosts, separated by spaces that compose the * members of the host list * * FUNCTION DEFINITIONS * * SSISetDefaultPort(int port) * * Set the port used for initial socket to be stablished betwen root and * non-root nodes. This is used for an initial handshaking process in which * a new port number is established, so it will normally only be used for * a short space of time. The argument may be overridden by the SSI_PORT * environment variable * * SSIConnect() * * Connects the root node to each of the others. * * int * SSINNodes(); * * Return the number of processes. * * int * SSINodeID(); * * Return number (counting from 0) of the calling process. * * SSIListHosts(FILE *fp) * * Lists the members of the set to file pointer fp. * * SSISlaveWait() * * To be issued on slave nodes, cause them to await a command string from * the root (SSIMasterExec) or a exit signal (see SSIMasterExit). On * receipt of the latter, the function will return. * * int * SSIMasterExec(char *command) * * To be issued on the root node while the slaves are waiting in * SSISlaveWait. The command is passed to the slaves and executed using * "system" function on both root and slaves. The return codes from the * are compared, and if they match, the code is returned. If they don't * match, SSI_RETCODE_MISMATCH is returned. * * SSIMasterPutenv(char *string) * * To be issues on the root node to modify the environment of both * root and slaves. String should have the form "name=value" as specified * in the putenv (3C) man page. * * SSIMasterExit(char *command) * * To be issued on the root node while the slaves are waiting in * SSISlaveWait. An exit signal is passed to the slaves, which then * return from SSISlaveWait. * * SSIShutDown() * * Close all socket connections. * * Paul Sherwood CCLRC 1996, stolen in large part from Royd Whittington. * ******************************************************************************/ extern char *sys_errlist[]; #include #ifdef AIX #include #endif #include #include #include #ifndef SGI #include #endif #include #include #include #include #include #include #include #include #include #include #include #define DGRAMSIZE 1048576 #include "ssi.h" #define MAXNODES 16 #define STRLEN 257 static char host[MAXNODES][STRLEN]; /* these parameters are used to set up a handshake channel */ static int hport=6999; static int hsock[MAXNODES]; static struct sockaddr_in hserver[MAXNODES]; /* these are established after the handshaking phase */ static int port[MAXNODES]; static int sock[MAXNODES]; static struct sockaddr_in server[MAXNODES]; static int nhost, ihost; static int debug=0; static FILE *fperr = stderr; static FILE *fpdebug = stderr; static char buffer[STRLEN]; /* local utility functions */ int SSIsplit_line(char *line, char *strings,int maxs, int maxc); char *SSIstrst(char *p,char *q); #define PACKET_SIZE 32768 /* Size that system buffers set to */ #ifdef TEST main() { int i; SSISetDefaultPort(7001); if(SSIConnect())exit(-1); if(SSINodeID() == 0)SSIListHosts(stdout); if(SSINodeID() == 0){ /* master process - would normally pass control to interpreter here this is just to check the connections */ for(i=0;i<30;i++){ printf("test %d\n",i); SSIMasterExec("hostname"); sleep(2); } /* now shut down */ SSIMasterExit(); SSIShutDown(); exit(0); } else { SSISlaveWait(); SSIShutDown(); exit(0); } } master_exit(){ SSIMasterExit(); SSIShutDown(); exit(0); } master_system(char *t){ SSIMasterExec(t); } #endif /******************************************************************************** * * SSISetDefaultPort() * ********************************************************************************/ SSISetDefaultPort(int newport){ hport=newport; } /******************************************************************************** * * SSIConnect() * ********************************************************************************/ SSIConnect(){ int i; int length; char *portname; char *hostlist; char *debvar; int hl_sock; int l_sock; fd_set fdread; int iret; int conn; int itry; int bound; int nodeport; int rval; int nbytes; int new_port; struct hostent *hp, *gethostbyname(); /* establish table of hosts */ hostlist= getenv("SSI_HOSTS"); if(!hostlist){ gethostname(host[0],STRLEN); nhost=1; ihost=0; printf(" null hostlist\n"); return 0; } else { printf(" hostlist= %s\n",hostlist); } nhost = SSIsplit_line(hostlist,host,MAXNODES,STRLEN); for(i=0;ih_addr, (char *)&hserver[i].sin_addr, hp->h_length); hserver[i].sin_port = htons(hport); if (connect(hl_sock, (struct sockaddr *)&hserver[i], sizeof hserver[i]) == 0) { conn=1; } else { /* we are going to have to try again */ if (errno == ECONNREFUSED){ /* probably the program at the other end is not there yet, we just need to wait, and try again */ if(debug || (itry == SSI_MAX_CONN_ATTEMPTS)) fprintf(fperr,"SSIConnect node 0: Failed to connect to %s after %d tries (%s)\n",host[i],itry,sys_errlist[errno]); if(itry == SSI_MAX_CONN_ATTEMPTS) SSIFatalError(__LINE__,"connect","too many tries",NULL); /*if(shutdown(hl_sock,2))SSIWarning(__LINE__,"shutdown","failed on master",NULL);*/ if(close(hl_sock))SSIWarning(__LINE__,"close","failed on master",NULL); sleep(1); } else { SSIFatalError(__LINE__,"connect","unrecoverable error in handshake",NULL); } } } if(debug)fprintf(fpdebug,"SSIConnect node %d: Established handshake socket %d to host %d (%s)\n",0,hl_sock,i,host[i]); /* NOW receive the port for the job to use */ if(SSIReadFromSocket(hl_sock, (char *) &new_port, sizeof(int)) == -1){ SSIFatalError(__LINE__,"recv","SSIReadFromSocket failed",buffer); } if(debug)fprintf(fpdebug, "SSIConnect node 0: got port %d from node %d:\n",new_port,ihost); /* We have finished with the handshake port, now shut it down */ /*if(shutdown(hl_sock,2))SSIWarning(__LINE__,"shutdown","failed on master",NULL);*/ if(close(hl_sock))SSIWarning(__LINE__,"close","failed on master",NULL); /* Now establish connect on this port - should be able to do this in one try as the other node should have issued a listen before sending the port number */ if(debug)fprintf(fpdebug,"SSIConnect node 0: Attempting to connect to %d (%s) port #%d \n",i,host[i],new_port); /* create socket */ l_sock = socket(AF_INET, SOCK_STREAM, 0); if (l_sock < 0)SSIFatalError(__LINE__,"socket","creation failed on master",NULL); /* Try and bind it */ server[i].sin_family = AF_INET; hp = gethostbyname(host[i]); if (hp == 0)SSIFatalError(__LINE__,"gethostbyname","lookup failed for",host[i]); bcopy((char *)hp->h_addr, (char *)&server[i].sin_addr, hp->h_length); server[i].sin_port = htons(new_port); if (connect(l_sock, (struct sockaddr *)&server[i], sizeof server[i]) != 0) { SSIFatalError(__LINE__,"connect","unrecoverable error",NULL); } sock[i] = l_sock; port[i] = new_port; if(debug)fprintf(fpdebug,"SSIConnect node 0: Established socket %d to host %d (%s)\n",sock[i],i,host[i]); } } else { /* slave node listens for socket connection */ /* Internet domain sockets from anywhere */ hserver[0].sin_family = AF_INET; hserver[0].sin_addr.s_addr = INADDR_ANY; /* create sock */ hl_sock = socket(AF_INET, SOCK_STREAM, 0); if (hl_sock < 0)SSIFatalError(__LINE__,"socket","creation failed on slave",NULL); /* this is the handshake port */ hserver[0].sin_port = hport; itry = bound = 0; while (!bound) { if (bind(hl_sock, (struct sockaddr *)&hserver[0], sizeof server[0]) < 0){ if (errno == EADDRINUSE){ /* try waiting, there may be another pair of processes using this port they should clear off soon. */ if(itry < SSI_MAX_BIND_ATTEMPTS){ if(debug)fprintf(fpdebug,"SSIConnect node %d : wait to bind handshake port",ihost); sleep(1); itry++; }else{ SSIFatalError(__LINE__,"bind","too many tries",host[i]); } } else { SSIFatalError(__LINE__,"bind","failed on slave",host[i]); } } else { bound = 1; } } if(debug)fprintf(fpdebug,"SSIConnect node %d: Handshake bind succeeded on socket %d\n",ihost, hl_sock); iret = listen(hl_sock, 1); /* Now wait for the handshake connection */ while(1) { FD_ZERO(&fdread); FD_SET(hl_sock, &fdread); select(64, &fdread, 0, 0, 0); if (!(FD_ISSET(hl_sock, &fdread)))continue; hsock[0] = accept(hl_sock, (struct sockaddr *)0, (int *)0); /*if(shutdown(hl_sock,2))SSIWarning(__LINE__,"shutdown","failed on slave",NULL);*/ if(close(hl_sock))SSIWarning(__LINE__,"close","failed on master",NULL); break; } if(debug)fprintf(fpdebug,"SSIConnect node %d: Slave established handshake socket=%d\n",ihost, hsock[0]); /* Now repeat, this time we use a wildcard for the bind */ /* Internet domain sockets from anywhere */ server[0].sin_family = AF_INET; server[0].sin_addr.s_addr = INADDR_ANY; server[0].sin_port = 0; /* create socket */ l_sock = socket(AF_INET, SOCK_STREAM, 0); if (l_sock < 0) SSIFatalError(__LINE__,"socket","creation failed on slave",NULL); if (bind(l_sock, (struct sockaddr *)&server[0], sizeof server[0]) < 0) SSIFatalError(__LINE__,"bind","failed on slave",host[i]); /* get real port number to use - this bit may be redundant as we didnt wildcard */ length = sizeof server[0]; if (getsockname(l_sock, (struct sockaddr *)&server[0], &length) < 0) SSIFatalError(__LINE__,"getsockname","failed on slave",host[i]); port[0] = ntohs(server[0].sin_port); if(debug)fprintf(fpdebug, "SSIConnect node %d: Bound socket on port #%d\n", ihost, port[0]); /* Now send the new port to node 0 using the handshake socket */ if(SSIWriteToSocket(hsock[0], (char *) &port[0], sizeof(int)) == -1) SSIFatalError(__LINE__,"send","SSIWriteToSocket failed",NULL); /* Listen for the main connection */ iret = listen(l_sock, 1); while(1) { FD_ZERO(&fdread); FD_SET(l_sock, &fdread); select(64, &fdread, 0, 0, 0); if (!(FD_ISSET(l_sock, &fdread)))continue; sock[0] = accept(l_sock, (struct sockaddr *)0, (int *)0); /*if(shutdown(l_sock,2))SSIWarning(__LINE__,"shutdown","failed on slave",NULL);*/ if(close(l_sock))SSIWarning(__LINE__,"close","failed on master",NULL); break; } /* We have now finished with the handshake socket */ shutdown(hsock[0],2); if(debug)fprintf(fpdebug, "SSIConnect node %d: Connection established socket=%d\n", ihost, sock[0]); /* now we sometimes have a problem - the next recv - in the brdcst failing - try waiting here */ sleep(1); } return 0; } /******************************************************************************** * * SSINodeID(): node id * ********************************************************************************/ int SSINodeID(){ return ihost; } /******************************************************************************** * * SSINNodes: number of nodes * ********************************************************************************/ int SSINNodes(){ return nhost; } /******************************************************************************** * * SSIBrdcst: root zero broadcast * ********************************************************************************/ SSIBrdcst(int nbytes,void *buff){ int rval,i; if(ihost == 0){ for(i=1;i= 0){ if(shutdown(sock[i],2)){ fprintf(fperr,"SSIShutDown node %d: problem in shutdown: %s\n",ihost,sys_errlist[errno]); exit(-1); } } } }else{ if(sock[0] >= 0){ if(shutdown(sock[0],2)){ fprintf(fperr,"SSIShutDown node %d: problem in shutdown: %s\n",ihost,sys_errlist[errno]); exit(-1); } } } if(debug)fprintf(fpdebug,"SSIShutDown node %d: complete\n",ihost); if(debug)fflush(fpdebug); } /******************************************************************************** * * SSIListHosts: list hostnames * ********************************************************************************/ void SSIListHosts(FILE *fp){ int i; fprintf(fp,"Node Host ==================\n"); for(i=0;i 0) { again: if ( (nread = recv(sock, buf, (int) lenbuf, 0)) <= 0) { if (errno == EINTR) { goto again; } else { (void) fprintf(stderr,"sock=%d, pid=%ld, nread=%d, len=%ld errno=%d err=%s\n", sock, ihost, nread, lenbuf, errno, sys_errlist[errno] ); (void) fflush(stderr); status = -1; break; } } buf += nread; lenbuf -= nread; } return status; } int SSIWriteToSocket(int sock, char *buf, long lenbuf) /* Write to the socket in packets of PACKET_SIZE bytes */ { int status = lenbuf; int nsent, len; while (lenbuf > 0) { len = (lenbuf > PACKET_SIZE) ? PACKET_SIZE : lenbuf; nsent = send(sock, buf, (int) len, 0); if(nsent == 0 || debug) fprintf(fpdebug, "SSIWriteToSocket : warning node %d, 0-length send on sock %d\n",ihost,sock); if (nsent < 0) { /* This is bad news */ (void) fprintf(stderr,"sock=%d, pid=%ld, nsent=%d, len=%ld\n", sock, ihost, nsent, lenbuf); (void) fflush(stderr); status = -1; break; } buf += nsent; lenbuf -= nsent; } return status; } /******************************************************************** * * SSIstrst : index the occurence of q within p returning string p * ********************************************************************/ char *SSIstrst(p,q) char *p, *q; { char *p1,*q1,*p2; for(p1 = p ;*p1 != '\0' ;p1++){ for(p2 = p1, q1 = q; (*p2 == *q1) && (*q1 != '\0') ;q1++,p2++) ; if(*q1 == '\0')return p1; } return NULL; } /************************************************************** * split a line into strings * return the number of strings **************************************************************/ int SSIsplit_line(line,strings,maxs, maxc) char *line; char *strings; int maxs,maxc; { int ns, nc, instring, i; char *p; /* parse into fields, = is a field on its own*/ ns = -1; nc = 0; instring = 0; for(p = line; *p != '\0' && *p != '\n' ;p++) { if(*p == ' ' || *p == '\t'){ if(instring){ *(strings+maxc*ns+nc) = '\0'; nc = 0; instring = 0; } } else if(*p == '"'){ if(instring) { printf("error \" occurred inside a quoted string\n"); return -1; }else { nc = (int) (SSIstrst((p+1),"\"")-p-1); strncpy(strings+(++ns)*maxc,p+1,nc); *(strings+ns*maxc+nc) = '\0'; p = p+nc+1; nc = 0; } } else if(*p == '='){ if(instring) { *(strings+ns*maxc+nc) = '\0'; instring = 0; } *(strings+(++ns)*maxc) = '='; *(strings+ns*maxc) = '\0'; nc = 0; } else { /* other character */ if(!instring) { if(ns > maxs - 2)return -2; /* too many strings to store */ instring = 1; ns++; } if(nc < maxc -1){ *(strings+ns*maxc+nc++) = *p; } else{ printf("WARNING string truncated to %d characters\n",maxc); } } } /* end of loop over characters */ if(instring)*(strings+ns*maxc+nc) = '\0'; ns++; /* for(i = 0;i < ns; i++) printf("string %s\n",strings + i*maxc); */ return ns; } /******************************************************************************** * * SSISlaveWait() : enter master/slave section on slave * ********************************************************************************/ int SSISlaveWait() { char com[STRLEN]; int iret, rval; char *temp; while(1){ /* receive command */ SSIBrdcst(STRLEN,com); switch(com[0]){ case ':': switch (com[1]){ case 'e': /* SSIMasterExit was called */ return 0; break; case 'R': /* SSIMasterGamess was called */ gamess_(&iret); if(SSIWriteToSocket(sock[0], (char *) &iret, sizeof(int)) == -1) SSIFatalError(__LINE__,"send","error sending return code",NULL); break; case 'p': /* SSIMasterPutenv was called */ if(debug)fprintf(fpdebug,"SSISlaveWait node %d: putenv %s\n",ihost,com+3); temp = malloc(sizeof(char)*(strlen(com+3) + 1)); strcpy(temp,com+3); putenv(temp); break; default: fprintf(fperr,"SSISlaveWait node %d: ERROR bad : key %s\n",ihost,com); SSIShutDown(); exit(-1); } break; default: /* execute it */ if(debug)fprintf(fpdebug,"SSISlaveWait node %d: executing %s\n",ihost,com); iret = system(com); iret = WEXITSTATUS(iret); if(debug)fprintf(fpdebug,"SSISlaveWait node %d: returned %d\n",ihost,iret); /*send back return code */ if(SSIWriteToSocket(sock[0], (char *) &iret, sizeof(int)) == -1) SSIFatalError(__LINE__,"send","error sending return code",NULL); break; } if(debug)fflush(fpdebug); } } /******************************************************************************** * * SSIMasterExec() : Execute a command on all nodes (in master/slave section) * ********************************************************************************/ int SSIMasterExec(char *com1) { char com[STRLEN]; int iret,i,rval; int ret_array[MAXNODES]; if(strlen(com1) >= STRLEN){ fprintf(fperr,"SSIMasterExec: command string is too long\n"); fflush(fperr); return -1; } strcpy(com,com1); if(debug)fprintf(fpdebug,"SSIMasterExec node 0: Broadcasting %s\n",com); SSIBrdcst(STRLEN,com); /* execute command on root node */ if(debug)fprintf(fpdebug,"SSIMasterExec node 0: Executing %s\n",com); iret = system(com); iret = WEXITSTATUS(iret); if(debug)fprintf(fpdebug,"SSIMasterExec node 0: Call returned %d\n",iret); /* collect return codes */ ret_array[0] = iret; for(i=1;i