Main Page   Namespace List   Class Hierarchy   Compound List   File List   Compound Members   File Members   Related Pages   Examples  

FlowManager.cpp

00001 #define TRACE_NAME "FlowManager"
00002 #include "FlowManager.h"
00003 #include <sys/types.h>
00004 #include <sys/stat.h>
00005 #include <fcntl.h>
00006 #include <errno.h>
00007 #include <stdio.h>
00008 #include <sys/time.h>
00009 #include <unistd.h>
00010 #include <cassert>
00011 
00012 
00014 //
00015 //  Standard Member Functions
00016 //
00018 
00019 FlowManager FlowManager::main;
00020 
00021 FlowManager::FlowManager()
00022     : streams(16), runOnlyPending(false), maxfd(-1)
00023 {
00024     for (int i = 0; i < 16; i++) streams[i] = 0;
00025 }
00026 
00027 #if 0  /* omit definitions until they are implemented */
00028 
00029 FlowManager::FlowManager(const FlowManager& other)
00030 {
00031     NOT_IMPLEMENTED
00032 }
00033 
00034 const FlowManager& FlowManager::operator=(const FlowManager& other)
00035 {
00036     NOT_IMPLEMENTED
00037 }
00038 
00039 bool FlowManager::operator==(const FlowManager& other) const
00040 {
00041     NOT_IMPLEMENTED
00042 }
00043 
00044 bool FlowManager::operator<(const FlowManager& other) const
00045 {
00046     NOT_IMPLEMENTED
00047 }
00048 
00049 size_t FlowManager::hash() const
00050 {
00051     NOT_IMPLEMENTED
00052 }
00053 
00054 std::ostream& FlowManager::print_to(std::ostream& stream) const
00055 {
00056     NOT_IMPLEMENTED
00057 }
00058 
00059 #endif /* omit definitions until they are implemented */
00060     
00061 FlowManager::~FlowManager()
00062 {
00063     // blank default    
00064 }
00065 
00067 //
00068 //  Additional Public Member Functions
00069 //
00071 
00072 static bool absoluteURI(const char* s) 
00073 {
00074     for (const char* p=s; *p; p++) {
00075     switch (*p) {
00076     case ':':
00077         if (p==s) {
00078         return false;
00079         } else {
00080         return true;
00081         }
00082     case 'a':   case 'b':   case 'c':   case 'd':
00083     case 'e':   case 'f':   case 'g':   case 'h':
00084     case 'i':   case 'j':   case 'k':   case 'l':
00085     case 'm':   case 'n':   case 'o':   case 'p':
00086     case 'q':   case 'r':   case 's':   case 't':
00087     case 'u':   case 'v':   case 'w':   case 'x':
00088     case 'y':   case 'z':
00089     case 'A':   case 'B':   case 'C':   case 'D':
00090     case 'E':   case 'F':   case 'G':   case 'H':
00091     case 'I':   case 'J':   case 'K':   case 'L':
00092     case 'M':   case 'N':   case 'O':   case 'P':
00093     case 'Q':   case 'R':   case 'S':   case 'T':
00094     case 'U':   case 'V':   case 'W':   case 'X':
00095     case 'Y':   case 'Z':
00096     case '-':
00097         // so far it might be; let's keep looking...
00098         break;
00099     default:
00100         // it can't be a URI-Scheme, so this must be relative
00101         return false;
00102     }
00103     }
00104     return false;
00105 }
00106 
00107 FlowManager::Stream::Stream(const char* address, 
00108                 ByteSink* streamTo, 
00109                 Direction direction,
00110                 bool returnWhenDone)
00111     : 
00112     address(address), 
00113     streamTo(streamTo), 
00114     direction(direction), 
00115     fd(-1),
00116     wfd(-1),
00117     returnWhenDone(returnWhenDone)
00118 {
00119 
00120     if (!strncmp(address, "-", 1)) {
00121     switch (direction) {
00122     case FlowManager::READ:
00123         fd = 0;
00124         break;
00125     case FlowManager::WRITE:
00126         fd = 1;
00127         break;
00128     case FlowManager::READWRITE:
00129         fd = 0;
00130         break;
00131     }
00132     return;
00133     }
00134 
00135     if (absoluteURI(address)) {
00136     
00137     if (!strcmp(address, "x-shell:")) {
00138 
00139         TRACE "forking a subprocess!\n";
00140 
00141         int toChild[2];
00142         int fromChild[2];
00143         assert(0 == pipe(toChild));
00144         assert(0 == pipe(fromChild));
00145 
00146         pid_t pid = fork();
00147 
00148         if (pid) {
00149         // in parent
00150         close(toChild[0]);
00151         close(fromChild[1]);
00152         fd = fromChild[0];
00153         wfd = toChild[1];
00154         return;
00155         } else {
00156         // in child
00157         close(toChild[1]);
00158         close(fromChild[0]);
00159         assert(-1 != dup2(toChild[0], 0));
00160         assert(-1 != dup2(fromChild[1], 1));
00161         // leave stderr as is....
00162         // or close it...?
00163         // or make it a file?
00164         char* s = strdup("/tmp/blindfold.stderr.XXXXXX");
00165         int efd = mkstemp(s);
00166         assert(-1 != dup2(efd, 2));
00167         free(s);
00168         assert(-1 != execlp("sh", "sh", 0));
00169         }
00170     } else {
00171 
00172         // should do full parsing, use http (etc) subsystems
00173 
00174         // IMPLEMENT   "x-shell:xsb%20"  :-)
00175         //    as fork & set async
00176         //    have a timer-based run-queue thing, so we 
00177         //    CAN set timeouts.   And feed those into
00178         //    the stream from a process.   And how to kill?
00179         //     .drop(Stream) ...?
00180         //
00181 
00182         std::cerr << "Can't open \"" << address << "\": ";
00183         std::cerr << "Absolute-URI addresses are not implemented." << std::endl;
00184         exit(1);
00185 
00186     }
00187     }
00188     int flags = 0;
00189     switch (direction) {
00190     case FlowManager::READ:
00191     flags = O_RDONLY;
00192     break;
00193     case FlowManager::WRITE:
00194     flags = O_CREAT | O_WRONLY | O_TRUNC;
00195     break;
00196     case FlowManager::READWRITE:
00197     flags = O_RDWR | O_APPEND;
00198     break;
00199     }
00200     flags |= O_NDELAY;
00201 
00202     fd = open(address, flags, 0600);
00203 
00204     if (fd == -1) {
00205     perror(address);
00206     // should throw error
00207     exit(1);
00208     }
00209 }
00210 
00211 
00212 void FlowManager::runPending()
00213 {
00214     runOnlyPending = true;
00215     run();
00216     runOnlyPending = false;
00217 }
00218 
00219 void FlowManager::run()
00220 {
00221     fd_set readfds, writefds, exceptfds;
00222     struct timeval tv;
00223     int retval;
00224     char buf[8192];
00225     ssize_t bytes;
00226 
00227     keepRunning = true;
00228     while (keepRunning) {
00229 
00230     //
00231     // Set flags for select
00232     //
00233 
00234     FD_ZERO(&readfds);
00235     FD_ZERO(&writefds);
00236     FD_ZERO(&exceptfds);
00237 
00238     bool work_to_do = false;
00239     for (int i = 0; i <= maxfd; i++) {
00240         if (streams[i] == 0) continue;
00241         TRACE "  stream " << i << ": " << streams[i] << std::endl;
00242         switch (streams[i]->direction) {
00243         case READ:
00244         FD_SET(i, &readfds);
00245         work_to_do = true;
00246         TRACE "  (want to read)" << std::endl;
00247         break;
00248         case WRITE:
00249         if (streams[i]->streamFrom.str().size()) {
00250             if (streams[i]->wfd == -1) {
00251             FD_SET(i, &writefds);
00252             TRACE "  (want to write on normal fd)" << std::endl;
00253             } else {
00254             FD_SET(streams[i]->wfd, &writefds);
00255             TRACE "  (want to write on " << streams[i]->wfd << std::endl;
00256             }
00257             work_to_do = true;
00258             TRACE "  ... with some bytes to write" << std::endl;
00259         }
00260         break;
00261         case READWRITE:
00262         FD_SET(i, &readfds);
00263         work_to_do = true;
00264         if (streams[i]->streamFrom.str().size()) {
00265             if (streams[i]->wfd == -1) {
00266             FD_SET(i, &writefds);
00267             TRACE "  (want to write on normal fd)" << std::endl;
00268             } else {
00269             FD_SET(streams[i]->wfd, &writefds);
00270             TRACE "  (want to write on " << streams[i]->wfd << std::endl;
00271             }
00272             work_to_do = true;
00273             TRACE "  ... with some bytes to write" << std::endl;
00274         }
00275         break;
00276         }
00277         FD_SET(i, &exceptfds);
00278     }
00279     
00280     if (runOnlyPending && !work_to_do) {
00281         TRACE "Returning from runPending -- no more work to do." << std::endl;
00282         return;
00283     }
00284 
00285     /* Wait up to 0.05 seconds (~human response time)  */
00286     tv.tv_sec = 0;
00287     tv.tv_usec = 50000;
00288 
00289     //
00290     // Call Select
00291     //
00292     retval = select(maxfd+1, &readfds, &writefds, &exceptfds, &tv);
00293 
00294     //
00295     // Check & act on flags from select
00296     //
00297     if (retval) {
00298         TRACE "select returned " << retval << std::endl;
00299         for (int i = 0; i <= maxfd; i++) {
00300         Stream* s = streams[i];
00301         if (!s) continue;
00302         if (FD_ISSET(i, &readfds)) {
00303             bytes = read(i, &buf, sizeof(buf));
00304             TRACE "readable fd " << i << ", " << bytes << " read\n";
00305             if (bytes < 0) { 
00306             if (errno == EINTR || errno == EAGAIN) {
00307                 // do nothing; we'll just try again
00308             } else {
00309                 perror("read"); 
00310                 exit(1);
00311             }
00312             } else {
00313             if (bytes != 0) {
00314                 s->streamTo->write(&buf, bytes);
00315             } else {
00316                 s->streamTo->close();
00317                 if (s->direction == READ) {
00318                 bool r = streams[i]->returnWhenDone;
00319                 delete(s);
00320                 streams[i] = 0;
00321                 if (r) keepRunning = false;
00322                 } else {
00323                 // BUG remove read portion, but what
00324                 // about write?
00325                 bool r = streams[i]->returnWhenDone;
00326                 delete(s);
00327                 streams[i] = 0;
00328                 if (r) keepRunning = false;
00329                 }
00330             }
00331             }
00332         }
00333         int wfd = i;
00334         if (s->wfd != -1) wfd = s->wfd;
00335         if (FD_ISSET(wfd, &writefds)) {
00336             
00337             int bytes_avail = s->streamFrom.str().size();
00338             bytes = write(wfd, s->streamFrom.str().c_str(), bytes_avail);
00339             TRACE "writable fd " << i << "/" << wfd << ", " << bytes << " written (of "
00340                   << bytes_avail << "queued)" << std::endl;
00341             if (bytes >= 0) {
00342             //s->streamFrom.ignore(bytes, '\07ff');   not working
00343             std::string leftover=s->streamFrom.str().substr(bytes);
00344             s->streamFrom.str(leftover);
00345             bytes_avail = s->streamFrom.str().size();
00346             TRACE "  queued is now "  << bytes_avail << std::endl;
00347             if (bytes == bytes_avail) {
00348                 // eof & fail flags probably just got set.  clear them!
00349                 s->streamFrom.clear(0);
00350             }
00351             } else {
00352             if (errno == EINTR || errno == EAGAIN) {
00353                 // do nothing; we'll just try again
00354             } else {
00355                 perror("write"); 
00356                 exit(1);
00357             }
00358             }
00359         }
00360         if (FD_ISSET(i, &exceptfds)) {
00361             std::cerr << "ignoring exception on fd " << i << std::endl;
00362         }
00363         if (wfd!=i && FD_ISSET(wfd, &exceptfds)) {
00364             std::cerr << "ignoring exception on fd " << i << std::endl;
00365         }
00366         }
00367     } else {
00368         TRACE "Select timed out.\n";
00369         if (runOnlyPending) {
00370         TRACE "Returning from runPending, because of timeout, even with work to do.   This may be a bug." << std::endl;
00371         return;
00372     }
00373 
00374     }
00375     
00376     // try runnables...
00377 
00378     }
00379 
00380     // select
00381 
00382     // try to read FD's we can
00383 
00384     // look at our output FIFOs and write anything we can
00385 }
00386 
00387 
00388 
00390 //
00391 //  Additional Private Member Functions
00392 //
00394 
00395 //    int open(char* address, Direction direction);
00396 
00397 void FlowManager::addStream(Stream* s) 
00398 {
00399     TRACE "new fd is " << s->fd << std::endl;
00400     if (s->fd > maxfd) {
00401     maxfd=s->fd;
00402     TRACE "maxfd upped to " << maxfd << std::endl;
00403     } else {
00404     TRACE "which is <= " << maxfd << std::endl;
00405     }
00406     if (s->fd+1 > (int) streams.size()) {
00407     streams.resize(s->fd+1);
00408     TRACE "streams resized to " << s->fd+1 << std::endl;
00409     }
00410     streams[s->fd] = s;
00411 }
00412 #undef TRACE_NAME

Home to blindfold. This page generated via doxygen 1.2.11.1 Wed Oct 10 16:40:33 2001.