Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Namespace Members | Compound Members | File Members | Related Pages | Examples

vos/corelibs/vos/remotestreamsite.cc

Go to the documentation of this file.
00001 /*
00002     This file is part of the Virtual Object System of
00003     the Interreality project (http://interreality.org).
00004 
00005     Copyright (C) 2001-2003 Peter Amstutz
00006 
00007     This library is free software; you can redistribute it and/or
00008     modify it under the terms of the GNU Lesser General Public
00009     License as published by the Free Software Foundation; either
00010     version 2 of the License, or (at your option) any later version.
00011 
00012     This library is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015     Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public
00018     License along with this library; if not, write to the Free Software
00019     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00020 
00021     Peter Amstutz <tetron@interreality.org>
00022 */
00023 
00024 #include "remotestreamsite.hh"
00025 #include "messageblock.hh"
00026 #include "message.hh"
00027 
00028 using namespace VOS;
00029 
00030 #include <stdio.h>
00031 #include <errno.h>
00032 #include <sys/types.h>
00033 #include <sys/stat.h>
00034 #include <fcntl.h>
00035 
00036 RemoteStreamSite::RemoteStreamSite(FILE* readfd, FILE* writefd)
00037     : VobjectImplementation("fd", 0, false), RemoteMetaObject("fd", 0),
00038       RemoteVobject("fd", 0), MetaObject(0),
00039       readingFILE(readfd), writingFILE(writefd), pretendIsLocal(false), isScript(false),
00040       partialMessage(new MessageBlock())
00041 {
00042 }
00043 
00044 RemoteStreamSite::~RemoteStreamSite()
00045 {
00046     LOG("remotestreamsite", 5, "deleting " << getURL().getString());
00047     if(partialMessage) partialMessage->release();
00048 }
00049 
00050 int RemoteStreamSite::readStream(char* data, unsigned int datasize)
00051 {
00052     return (int)fread(data, 1, datasize, readingFILE);
00053 }
00054 
00055 int RemoteStreamSite::writeStream(const char* data, unsigned int datasize)
00056 {
00057     return (int)fwrite(data, 1, datasize, writingFILE);
00058 }
00059 
00060 void RemoteStreamSite::sendMessage(Message* m)
00061 {
00062 /*    if(outgoingLock) {
00063         if(enqueueOutgoing) {
00064             m->acquire();  // released by: enableOutgoing()
00065             outgoingQueue.push(pair<Message*, MessageBlock*>(m, 0));
00066         }
00067         } else {*/
00068         Site::sendMessage(m);
00069         LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString() << " [" << m->refcount_debug
00070             << "/" << m->getCount() <<  "]\n" << m->getFormattedString());
00071         LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00072 
00073         if(writingFILE > 0) {
00074             const string& s = m->getFormattedString();
00075             if(writeStream(s.c_str(), (unsigned int)s.size()) < 1) {
00076                 throw SiteConnectionError("disconnected");
00077             }
00078         }
00079 //    }
00080 }
00081 
00082 void RemoteStreamSite::sendMessage(MessageBlock* m)
00083 {
00084 /*    if(outgoingLock) {
00085         if(enqueueOutgoing) {
00086             m->acquire();  // released by: enableOutgoing()
00087             outgoingQueue.push(pair<Message*, MessageBlock*>(0, m));
00088         }
00089         } else {*/
00090         Site::sendMessage(m);
00091         LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString() << "\n" << m->getString());
00092         LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00093 
00094         if(writingFILE > 0) {
00095             const string& s = m->getString();
00096             if(writeStream(s.c_str(), (unsigned int)s.size()) < 1) {
00097                 throw SiteConnectionError("disconnected");
00098             }
00099         }
00100 //    }
00101 }
00102 
00103 void RemoteStreamSite::handleDisconnection()
00104 {
00105     LOG("remotesite", 3, "remote site disconnected: " << strerror(errno));
00106     LOG("refcount", 5, "handleDisconnection: remote site count is " << getCount());
00107 
00108     //LOG("remotesite", 4, "closing connection rfd " << readingFD << "  wrf " << writingFD);
00109 
00110     if(readingFILE) fclose(readingFILE);
00111     if(writingFILE) fclose(writingFILE);
00112     //allOpenSockets.erase(readingFD);
00113     readingFILE = writingFILE = 0;
00114 
00115     if(! isScript) excise();
00116 }
00117 
00118 void RemoteStreamSite::flushIncomingBuffers()
00119 {
00120     LOG("remotestreamsite", 5, "check baby, remote stream site " << this);
00121     if(readingFILE == 0) return;
00122     do {
00123         char data[1024];
00124 
00125         LOG("remotestreamsite", 5, "about to go into readStream");
00126         int r=readStream(data, sizeof(data));
00127         LOG("remotestreamsite", 4, "read stream got " << r);
00128         if(r == -1 || r == 0) {
00129             handleDisconnection();
00130             return;
00131         }
00132         string s((const char*)data, r);
00133         LOG("remotestreamsite", 5, "received " << s);
00134         int p=partialMessage->parseUpdate(s);
00135         LOG("remotestreamsite", 4, "parseUpdate returned " << p);
00136         while(p > 0) {
00137             vRef<Message> last = partialMessage->lastMessage();
00138             static int incoming_count = 1;
00139             last->incoming_debug = incoming_count++;
00140             localpeer->scheduleMessageBlock(partialMessage, 0, this);
00141             partialMessage->release(); // acquired by: new MessageBlock() above or below
00142             s.erase(s.begin(), s.begin() + p);
00143             partialMessage = new MessageBlock(); // released by: completed message (above) or destructor
00144             if(s.size() > 0) {
00145                 p = partialMessage->parseUpdate(s);
00146             } else break;
00147         }
00148     } while(1);
00149 }
00150 
00151 bool RemoteStreamSite::runScript(const string& filename, LocalSite& ls, Message* lastmessage)
00152 {
00153     FILE* fd = 0;
00154 
00155 #ifdef HAVE_POPEN
00156     FILE* gzip = 0;
00157     if(filename.substr(filename.size()-3, 3) == ".gz") {
00158         char run[512];
00159         snprintf(run, sizeof(run), "gunzip -c '%s'", filename.c_str());
00160         if(!(gzip = popen(run, "r"))) return false;
00161         fd = fileno(gzip);
00162     } else {
00163 #endif
00164         fd = fopen(filename.c_str(), "r");
00165 #ifdef HAVE_POPEN
00166     }
00167 #endif
00168 
00169     if(fd > 0) {
00170         pREF(RemoteStreamSite*, rss, new RemoteStreamSite(fd, 0),
00171              rss->pretendIsLocal = true;
00172              rss->isScript = true;
00173              string f = filename;
00174              for(unsigned int i = 0; i < f.size(); i++) if(f[i] == '/') f[i]='_';
00175              rss->setURL(URL("vop", f, "0", ""));
00176              rss->addHostAlias(f+":0");
00177              doSitePeering(&ls, rss, true, false);
00178              addSite(rss);
00179              //allOpenSockets.insert(fd);
00180              double oldtimeout = ls.getTimeoutOnSelect();
00181              ls.setTimeoutOnSelect(0);
00182              ls.flushIncomingBuffers();
00183              //LOG("remotestreamsite", 4, "0 rss readinfd is " << rss->readingFD);
00184              if(lastmessage) {
00185                  lastmessage->setSourceSite(rss);
00186                  ls.sendMessage(lastmessage);
00187              }
00188              //LOG("remotestreamsite", 4, "1 rss readinfd is " << rss->readingFD);
00189              while(rss->readingFILE > 0 || ls.checkScheduleHoldsSite(rss)) {
00190                  LOG("remotestreamsite", 4, "waiting for script...");
00191                  ls.flushIncomingBuffers();
00192              }
00193              ls.setTimeoutOnSelect(oldtimeout);
00194              LOG("remotestreamsite", 4, "about to rss excise with count " << rss->getCount());
00195              rss->excise();
00196              //allOpenSockets.erase(fd);
00197             );
00198 #ifdef HAVE_POPEN
00199         if(gzip) pclose(gzip);
00200 #endif
00201         return true;
00202     } else {
00203         LOG("remotestreamsite", 2, "Cannot open script: " << strerror(errno) << " " << filename);
00204         return false;
00205     }
00206 }
00207 
00208 bool RemoteStreamSite::isConnected()
00209 {
00210     return (isScript || writingFILE != 0);
00211 }

Generated on Tue Aug 12 03:55:41 2003 for Interreality Project - VOS by doxygen 1.3.2