/* * Copyright (c) 1996 Gunther Schadow. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #pragma implementation #include "PipeSocket.h" #include #include #include #include #include #include #include #include #ifdef PROTOGEN # include "exception.h" # include "logfile.h" #else # include # include # define ERROR(fmt, args...) { fprintf(stderr, fmt "\n" , ## args); exit(1); } # define ERROS(fmt, args...) { fprintf(stderr, fmt ": " , ## args); \ perror(""); exit(1); } # define FATAL(fmt, args...) { fprintf(stderr, fmt "\n" , ## args); abort(); } # define WARNING(fmt, args...) { fprintf(stderr, fmt "\n" , ## args); } #endif #include /********************************************************************** * argument parsing * * [[rw][jtk] ,] command **********************************************************************/ void PipeSocket::parse_args(const char *s, int l) { if(l == -1) l = strlen(s); const char *name_p = NULL; int name_l = 0; const char *flags_p = NULL; int flags_l = 0; if(l == 0) goto finish; while(l > 0 && isspace(*s)) { s++; l--; } flags_p = s; while(l > 0 && *s != ',') { s++; l--; flags_l++; } if(l == 0) goto finish; else { s++; l--; } while(l > 0 && isspace(*s)) { s++; l--; } name_p = s; while(l > 0) { s++; l--; name_l++; } finish: if(flags_p == NULL || name_p == NULL) ERROR("usage: pipe( [rw][jtk] , )"); // remove trailing spaces from command const char *p = name_p + name_l; while(name_l > 0 && isspace(*--p)) { name_l--; } // set the command _command = new char[name_l + 1]; memcpy(_command, name_p, name_l); _command[name_l] = 0; for(int i = 0; i < flags_l; i++) { switch(char f = tolower(flags_p[i])) { case 'r': _rwmode |= recv_mode; break; case 'w': _rwmode |= send_mode; break; case 'j': _pmode |= ProcInfo::join; break; case 't': _pmode |= ProcInfo::term; break; case 'k': _pmode |= ProcInfo::kill; break; default: WARNING("illegal pipe flag %c [rw][jtk]", f); } } } /********************************************************************** * ctor, dtor, and copy methods **********************************************************************/ PipeSocket::PipeSocket(const char *, const char *param, size_t parlen) : Socket(), _sigpipe(SIGPIPE) { _pd = -1; _child = NULL; _rwmode = 0; _pmode = 0; parse_args(param, parlen); } PipeSocket::PipeSocket() : Socket(), _sigpipe(SIGPIPE) { _pd = -1; _child = NULL; _rwmode = 0; _pmode = 0; _command = NULL; } PipeSocket::~PipeSocket() { if(_pd != -1) ::close(_pd); if(_command != NULL) delete [] _command; if(_child != NULL) delete _child; } Socket *PipeSocket::ctor(const char *address, const char *param, size_t parlen) { return new PipeSocket(address, param, parlen); } Socket *PipeSocket::ctor() { return new PipeSocket(); } /********************************************************************** * socket methods **********************************************************************/ static void vinit(va_list ap) { int *sv = va_arg(ap, int*); int mode = va_arg(ap, int); const char *cmd = va_arg(ap, const char*); ::close(sv[0]); if(mode & Socket::send_mode) { if(dup2(sv[1], STDIN_FILENO) == -1) { LOGERROS("dup2-0"); _exit(1); } } if(mode & Socket::recv_mode) { if(dup2(sv[1], STDOUT_FILENO) == -1) { LOGERROS("dup2-1"); _exit(1); } } ::close(sv[1]); execl(_PATH_BSHELL, "sh", "-c", cmd, NULL); LOGERROS("execl returned"); _exit(1); } bool PipeSocket::connect(flags_t mode) { if(_command == NULL) ERROR("no pipe command"); if(_pd != -1 || _child != NULL) close(); int sv[2]; if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) ERROS("socketpair"); _child = new Proc(_pmode); _child->vrun(vinit, sv, mode & _rwmode, _command); // the child does not return, assume we are the parent ::close(sv[1]); _pd = sv[0]; _state_plus |= connected | ( mode & _rwmode ) & ~eox_any; return true; } size_t PipeSocket::send(const char *buf, size_t len, int msgf) { size_t n = 0; // filter out the cases where SIGPIPE wouldn't be appropriate if(! ((len == 0) && (msgf & (msg_eot | msg_eom)))) n = ::send(_pd, buf, len, msgf & ~(msg_eot | msg_eom)); if(n == (size_t)-1) ERROS("send failed"); if( msgf & ( msg_eom | msg_eom ) ) { if( msgf & msg_eom ) { // never shut down write end twice, since this can close the read end! if(! ( _state_plus & ( eom_placed | eot_placed ) ) ) shutdown(send_mode); _state_plus |= eom_placed; } if( msgf & msg_eot ) { // never shut down write end twice, since this can close the read end! if(! ( _state_plus & ( eom_placed | eot_placed ) ) ) shutdown(send_mode); _state_plus |= eot_placed; } } return n; } size_t PipeSocket::recv(char *buf, size_t max, int) { if(! (_state_plus & connected)) ERROR("not connected"); size_t n = read(_pd, buf, max); if(n == (size_t)-1) ERROS("read failed"); if( n == 0 ) _state_plus |= eom_seen | eot_seen; return n; } void PipeSocket::shutdown(flags_t mode) { if(mode & recv_mode) ::shutdown(_pd, 0); else if(mode & send_mode) ::shutdown(_pd, 1); else ::shutdown(_pd, 2); _state_plus = _state_plus & ~( mode & send_recv ); } void PipeSocket::close(flags_t) // close the pipe unlike popen(3) we don't have to care for our child // process at all, since it is a managed Proc! { _state_plus = _state_plus & ~connected & ~send_recv; if(_pd != -1) { ::close(_pd); _pd = -1; } if(_child != NULL) { delete _child; _child = NULL; } } bool PipeSocket::async(bool on, pid_t pid) { bool old = _state_plus & async_mode; int flags; if(fcntl(_pd, F_GETFL, &flags) == -1) ERROS("fcntl:F_GETFL:"); if(on) { if(pid != 0) if(fcntl(_pd, F_SETOWN, &pid) == -1) ERROS("fcntl:F_SETOWN:"); flags |= O_ASYNC; _state_plus |= async_mode; if(fcntl(_pd, F_SETFL, &flags) == -1) ERROS("fcntl:F_SETFL:"); } else // off { flags &= ~O_ASYNC; if(fcntl(_pd, F_SETFL, &flags) == -1) ERROS("fcntl:F_SETFL:"); _state_plus &= ~async_mode; } return old; } bool PipeSocket::noblock(bool on) { bool old = _state_plus & noblock_mode; int flags; if(fcntl(_pd, F_GETFL, &flags) == -1) ERROS("fcntl:F_GETFL:"); if(on) { flags |= O_NONBLOCK; _state_plus |= noblock_mode; if(fcntl(_pd, F_SETFL, &flags) == -1) ERROS("fcntl:F_SETOWN:"); } else // off { flags &= ~O_NONBLOCK; if(fcntl(_pd, F_SETFL, &flags) == -1) ERROS("fcntl:F_SETFL:"); _state_plus &= ~noblock_mode; } return old; } bool PipeSocket::is_sendready(int sec, int usec) const { FdSet rfds; FdSet tfds(_pd, -1); select_again: int n = tfds.select(NULL, &rfds, NULL, sec, usec); if(n == -1) if(errno == EINTR) goto select_again; else ERROS("select"); return n != 0; } bool PipeSocket::is_recvready(int sec, int usec) const { FdSet rfds; FdSet tfds(_pd, -1); select_again: int n = tfds.select(&rfds, NULL, NULL, sec, usec); if(n == -1) if(errno == EINTR) goto select_again; else ERROS("select"); return n != 0; } bool PipeSocket::is_exception(int sec, int usec) const { FdSet rfds; FdSet tfds(_pd, -1); select_again: int n = tfds.select(NULL, NULL, &rfds, sec, usec); if(n == -1) if(errno == EINTR) goto select_again; else ERROS("select"); return n != 0; }