00001 #define TRACE_NAME "FlowManager"
00002 #include "FlowManager.h"
00003 #include <sys/types.h>
00004 #include <sys/stat.h>
00005 #include <fcntl.h>
00006 #include <errno.h>
00007 #include <stdio.h>
00008 #include <sys/time.h>
00009 #include <unistd.h>
00010 #include <cassert>
00011
00012
00014
00015
00016
00018
00019 FlowManager FlowManager::main;
00020
00021 FlowManager::FlowManager()
00022 : streams(16), runOnlyPending(false), maxfd(-1)
00023 {
00024 for (int i = 0; i < 16; i++) streams[i] = 0;
00025 }
00026
00027 #if 0
00028
00029 FlowManager::FlowManager(const FlowManager& other)
00030 {
00031 NOT_IMPLEMENTED
00032 }
00033
00034 const FlowManager& FlowManager::operator=(const FlowManager& other)
00035 {
00036 NOT_IMPLEMENTED
00037 }
00038
00039 bool FlowManager::operator==(const FlowManager& other) const
00040 {
00041 NOT_IMPLEMENTED
00042 }
00043
00044 bool FlowManager::operator<(const FlowManager& other) const
00045 {
00046 NOT_IMPLEMENTED
00047 }
00048
00049 size_t FlowManager::hash() const
00050 {
00051 NOT_IMPLEMENTED
00052 }
00053
00054 std::ostream& FlowManager::print_to(std::ostream& stream) const
00055 {
00056 NOT_IMPLEMENTED
00057 }
00058
00059 #endif
00060
00061 FlowManager::~FlowManager()
00062 {
00063
00064 }
00065
00067
00068
00069
00071
00072 static bool absoluteURI(const char* s)
00073 {
00074 for (const char* p=s; *p; p++) {
00075 switch (*p) {
00076 case ':':
00077 if (p==s) {
00078 return false;
00079 } else {
00080 return true;
00081 }
00082 case 'a': case 'b': case 'c': case 'd':
00083 case 'e': case 'f': case 'g': case 'h':
00084 case 'i': case 'j': case 'k': case 'l':
00085 case 'm': case 'n': case 'o': case 'p':
00086 case 'q': case 'r': case 's': case 't':
00087 case 'u': case 'v': case 'w': case 'x':
00088 case 'y': case 'z':
00089 case 'A': case 'B': case 'C': case 'D':
00090 case 'E': case 'F': case 'G': case 'H':
00091 case 'I': case 'J': case 'K': case 'L':
00092 case 'M': case 'N': case 'O': case 'P':
00093 case 'Q': case 'R': case 'S': case 'T':
00094 case 'U': case 'V': case 'W': case 'X':
00095 case 'Y': case 'Z':
00096 case '-':
00097
00098 break;
00099 default:
00100
00101 return false;
00102 }
00103 }
00104 return false;
00105 }
00106
00107 FlowManager::Stream::Stream(const char* address,
00108 ByteSink* streamTo,
00109 Direction direction,
00110 bool returnWhenDone)
00111 :
00112 address(address),
00113 streamTo(streamTo),
00114 direction(direction),
00115 fd(-1),
00116 wfd(-1),
00117 returnWhenDone(returnWhenDone)
00118 {
00119
00120 if (!strncmp(address, "-", 1)) {
00121 switch (direction) {
00122 case FlowManager::READ:
00123 fd = 0;
00124 break;
00125 case FlowManager::WRITE:
00126 fd = 1;
00127 break;
00128 case FlowManager::READWRITE:
00129 fd = 0;
00130 break;
00131 }
00132 return;
00133 }
00134
00135 if (absoluteURI(address)) {
00136
00137 if (!strcmp(address, "x-shell:")) {
00138
00139 TRACE "forking a subprocess!\n";
00140
00141 int toChild[2];
00142 int fromChild[2];
00143 assert(0 == pipe(toChild));
00144 assert(0 == pipe(fromChild));
00145
00146 pid_t pid = fork();
00147
00148 if (pid) {
00149
00150 close(toChild[0]);
00151 close(fromChild[1]);
00152 fd = fromChild[0];
00153 wfd = toChild[1];
00154 return;
00155 } else {
00156
00157 close(toChild[1]);
00158 close(fromChild[0]);
00159 assert(-1 != dup2(toChild[0], 0));
00160 assert(-1 != dup2(fromChild[1], 1));
00161
00162
00163
00164 char* s = strdup("/tmp/blindfold.stderr.XXXXXX");
00165 int efd = mkstemp(s);
00166 assert(-1 != dup2(efd, 2));
00167 free(s);
00168 assert(-1 != execlp("sh", "sh", 0));
00169 }
00170 } else {
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182 std::cerr << "Can't open \"" << address << "\": ";
00183 std::cerr << "Absolute-URI addresses are not implemented." << std::endl;
00184 exit(1);
00185
00186 }
00187 }
00188 int flags = 0;
00189 switch (direction) {
00190 case FlowManager::READ:
00191 flags = O_RDONLY;
00192 break;
00193 case FlowManager::WRITE:
00194 flags = O_CREAT | O_WRONLY | O_TRUNC;
00195 break;
00196 case FlowManager::READWRITE:
00197 flags = O_RDWR | O_APPEND;
00198 break;
00199 }
00200 flags |= O_NDELAY;
00201
00202 fd = open(address, flags, 0600);
00203
00204 if (fd == -1) {
00205 perror(address);
00206
00207 exit(1);
00208 }
00209 }
00210
00211
00212 void FlowManager::runPending()
00213 {
00214 runOnlyPending = true;
00215 run();
00216 runOnlyPending = false;
00217 }
00218
00219 void FlowManager::run()
00220 {
00221 fd_set readfds, writefds, exceptfds;
00222 struct timeval tv;
00223 int retval;
00224 char buf[8192];
00225 ssize_t bytes;
00226
00227 keepRunning = true;
00228 while (keepRunning) {
00229
00230
00231
00232
00233
00234 FD_ZERO(&readfds);
00235 FD_ZERO(&writefds);
00236 FD_ZERO(&exceptfds);
00237
00238 bool work_to_do = false;
00239 for (int i = 0; i <= maxfd; i++) {
00240 if (streams[i] == 0) continue;
00241 TRACE " stream " << i << ": " << streams[i] << std::endl;
00242 switch (streams[i]->direction) {
00243 case READ:
00244 FD_SET(i, &readfds);
00245 work_to_do = true;
00246 TRACE " (want to read)" << std::endl;
00247 break;
00248 case WRITE:
00249 if (streams[i]->streamFrom.str().size()) {
00250 if (streams[i]->wfd == -1) {
00251 FD_SET(i, &writefds);
00252 TRACE " (want to write on normal fd)" << std::endl;
00253 } else {
00254 FD_SET(streams[i]->wfd, &writefds);
00255 TRACE " (want to write on " << streams[i]->wfd << std::endl;
00256 }
00257 work_to_do = true;
00258 TRACE " ... with some bytes to write" << std::endl;
00259 }
00260 break;
00261 case READWRITE:
00262 FD_SET(i, &readfds);
00263 work_to_do = true;
00264 if (streams[i]->streamFrom.str().size()) {
00265 if (streams[i]->wfd == -1) {
00266 FD_SET(i, &writefds);
00267 TRACE " (want to write on normal fd)" << std::endl;
00268 } else {
00269 FD_SET(streams[i]->wfd, &writefds);
00270 TRACE " (want to write on " << streams[i]->wfd << std::endl;
00271 }
00272 work_to_do = true;
00273 TRACE " ... with some bytes to write" << std::endl;
00274 }
00275 break;
00276 }
00277 FD_SET(i, &exceptfds);
00278 }
00279
00280 if (runOnlyPending && !work_to_do) {
00281 TRACE "Returning from runPending -- no more work to do." << std::endl;
00282 return;
00283 }
00284
00285
00286 tv.tv_sec = 0;
00287 tv.tv_usec = 50000;
00288
00289
00290
00291
00292 retval = select(maxfd+1, &readfds, &writefds, &exceptfds, &tv);
00293
00294
00295
00296
00297 if (retval) {
00298 TRACE "select returned " << retval << std::endl;
00299 for (int i = 0; i <= maxfd; i++) {
00300 Stream* s = streams[i];
00301 if (!s) continue;
00302 if (FD_ISSET(i, &readfds)) {
00303 bytes = read(i, &buf, sizeof(buf));
00304 TRACE "readable fd " << i << ", " << bytes << " read\n";
00305 if (bytes < 0) {
00306 if (errno == EINTR || errno == EAGAIN) {
00307
00308 } else {
00309 perror("read");
00310 exit(1);
00311 }
00312 } else {
00313 if (bytes != 0) {
00314 s->streamTo->write(&buf, bytes);
00315 } else {
00316 s->streamTo->close();
00317 if (s->direction == READ) {
00318 bool r = streams[i]->returnWhenDone;
00319 delete(s);
00320 streams[i] = 0;
00321 if (r) keepRunning = false;
00322 } else {
00323
00324
00325 bool r = streams[i]->returnWhenDone;
00326 delete(s);
00327 streams[i] = 0;
00328 if (r) keepRunning = false;
00329 }
00330 }
00331 }
00332 }
00333 int wfd = i;
00334 if (s->wfd != -1) wfd = s->wfd;
00335 if (FD_ISSET(wfd, &writefds)) {
00336
00337 int bytes_avail = s->streamFrom.str().size();
00338 bytes = write(wfd, s->streamFrom.str().c_str(), bytes_avail);
00339 TRACE "writable fd " << i << "/" << wfd << ", " << bytes << " written (of "
00340 << bytes_avail << "queued)" << std::endl;
00341 if (bytes >= 0) {
00342
00343 std::string leftover=s->streamFrom.str().substr(bytes);
00344 s->streamFrom.str(leftover);
00345 bytes_avail = s->streamFrom.str().size();
00346 TRACE " queued is now " << bytes_avail << std::endl;
00347 if (bytes == bytes_avail) {
00348
00349 s->streamFrom.clear(0);
00350 }
00351 } else {
00352 if (errno == EINTR || errno == EAGAIN) {
00353
00354 } else {
00355 perror("write");
00356 exit(1);
00357 }
00358 }
00359 }
00360 if (FD_ISSET(i, &exceptfds)) {
00361 std::cerr << "ignoring exception on fd " << i << std::endl;
00362 }
00363 if (wfd!=i && FD_ISSET(wfd, &exceptfds)) {
00364 std::cerr << "ignoring exception on fd " << i << std::endl;
00365 }
00366 }
00367 } else {
00368 TRACE "Select timed out.\n";
00369 if (runOnlyPending) {
00370 TRACE "Returning from runPending, because of timeout, even with work to do. This may be a bug." << std::endl;
00371 return;
00372 }
00373
00374 }
00375
00376
00377
00378 }
00379
00380
00381
00382
00383
00384
00385 }
00386
00387
00388
00390
00391
00392
00394
00395
00396
00397 void FlowManager::addStream(Stream* s)
00398 {
00399 TRACE "new fd is " << s->fd << std::endl;
00400 if (s->fd > maxfd) {
00401 maxfd=s->fd;
00402 TRACE "maxfd upped to " << maxfd << std::endl;
00403 } else {
00404 TRACE "which is <= " << maxfd << std::endl;
00405 }
00406 if (s->fd+1 > (int) streams.size()) {
00407 streams.resize(s->fd+1);
00408 TRACE "streams resized to " << s->fd+1 << std::endl;
00409 }
00410 streams[s->fd] = s;
00411 }
00412 #undef TRACE_NAME