00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "remotestreamsite.hh"
00025 #include "messageblock.hh"
00026 #include "message.hh"
00027
00028 using namespace VOS;
00029
00030 #include <stdio.h>
00031 #include <errno.h>
00032 #include <sys/types.h>
00033 #include <sys/stat.h>
00034 #include <fcntl.h>
00035
00036 RemoteStreamSite::RemoteStreamSite(FILE* readfd, FILE* writefd)
00037 : VobjectImplementation("fd", 0, false), RemoteMetaObject("fd", 0),
00038 RemoteVobject("fd", 0), MetaObject(0),
00039 readingFILE(readfd), writingFILE(writefd), pretendIsLocal(false), isScript(false),
00040 partialMessage(new MessageBlock())
00041 {
00042 }
00043
00044 RemoteStreamSite::~RemoteStreamSite()
00045 {
00046 LOG("remotestreamsite", 5, "deleting " << getURL().getString());
00047 if(partialMessage) partialMessage->release();
00048 }
00049
00050 int RemoteStreamSite::readStream(char* data, unsigned int datasize)
00051 {
00052 return (int)fread(data, 1, datasize, readingFILE);
00053 }
00054
00055 int RemoteStreamSite::writeStream(const char* data, unsigned int datasize)
00056 {
00057 return (int)fwrite(data, 1, datasize, writingFILE);
00058 }
00059
00060 void RemoteStreamSite::sendMessage(Message* m)
00061 {
00062
00063
00064
00065
00066
00067
00068 Site::sendMessage(m);
00069 LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString() << " [" << m->refcount_debug
00070 << "/" << m->getCount() << "]\n" << m->getFormattedString());
00071 LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00072
00073 if(writingFILE > 0) {
00074 const string& s = m->getFormattedString();
00075 if(writeStream(s.c_str(), (unsigned int)s.size()) < 1) {
00076 throw SiteConnectionError("disconnected");
00077 }
00078 }
00079
00080 }
00081
00082 void RemoteStreamSite::sendMessage(MessageBlock* m)
00083 {
00084
00085
00086
00087
00088
00089
00090 Site::sendMessage(m);
00091 LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString() << "\n" << m->getString());
00092 LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00093
00094 if(writingFILE > 0) {
00095 const string& s = m->getString();
00096 if(writeStream(s.c_str(), (unsigned int)s.size()) < 1) {
00097 throw SiteConnectionError("disconnected");
00098 }
00099 }
00100
00101 }
00102
00103 void RemoteStreamSite::handleDisconnection()
00104 {
00105 LOG("remotesite", 3, "remote site disconnected: " << strerror(errno));
00106 LOG("refcount", 5, "handleDisconnection: remote site count is " << getCount());
00107
00108
00109
00110 if(readingFILE) fclose(readingFILE);
00111 if(writingFILE) fclose(writingFILE);
00112
00113 readingFILE = writingFILE = 0;
00114
00115 if(! isScript) excise();
00116 }
00117
00118 void RemoteStreamSite::flushIncomingBuffers()
00119 {
00120 LOG("remotestreamsite", 5, "check baby, remote stream site " << this);
00121 if(readingFILE == 0) return;
00122 do {
00123 char data[1024];
00124
00125 LOG("remotestreamsite", 5, "about to go into readStream");
00126 int r=readStream(data, sizeof(data));
00127 LOG("remotestreamsite", 4, "read stream got " << r);
00128 if(r == -1 || r == 0) {
00129 handleDisconnection();
00130 return;
00131 }
00132 string s((const char*)data, r);
00133 LOG("remotestreamsite", 5, "received " << s);
00134 int p=partialMessage->parseUpdate(s);
00135 LOG("remotestreamsite", 4, "parseUpdate returned " << p);
00136 while(p > 0) {
00137 vRef<Message> last = partialMessage->lastMessage();
00138 static int incoming_count = 1;
00139 last->incoming_debug = incoming_count++;
00140 localpeer->scheduleMessageBlock(partialMessage, 0, this);
00141 partialMessage->release();
00142 s.erase(s.begin(), s.begin() + p);
00143 partialMessage = new MessageBlock();
00144 if(s.size() > 0) {
00145 p = partialMessage->parseUpdate(s);
00146 } else break;
00147 }
00148 } while(1);
00149 }
00150
00151 bool RemoteStreamSite::runScript(const string& filename, LocalSite& ls, Message* lastmessage)
00152 {
00153 FILE* fd = 0;
00154
00155 #ifdef HAVE_POPEN
00156 FILE* gzip = 0;
00157 if(filename.substr(filename.size()-3, 3) == ".gz") {
00158 char run[512];
00159 snprintf(run, sizeof(run), "gunzip -c '%s'", filename.c_str());
00160 if(!(gzip = popen(run, "r"))) return false;
00161 fd = fileno(gzip);
00162 } else {
00163 #endif
00164 fd = fopen(filename.c_str(), "r");
00165 #ifdef HAVE_POPEN
00166 }
00167 #endif
00168
00169 if(fd > 0) {
00170 pREF(RemoteStreamSite*, rss, new RemoteStreamSite(fd, 0),
00171 rss->pretendIsLocal = true;
00172 rss->isScript = true;
00173 string f = filename;
00174 for(unsigned int i = 0; i < f.size(); i++) if(f[i] == '/') f[i]='_';
00175 rss->setURL(URL("vop", f, "0", ""));
00176 rss->addHostAlias(f+":0");
00177 doSitePeering(&ls, rss, true, false);
00178 addSite(rss);
00179
00180 double oldtimeout = ls.getTimeoutOnSelect();
00181 ls.setTimeoutOnSelect(0);
00182 ls.flushIncomingBuffers();
00183
00184 if(lastmessage) {
00185 lastmessage->setSourceSite(rss);
00186 ls.sendMessage(lastmessage);
00187 }
00188
00189 while(rss->readingFILE > 0 || ls.checkScheduleHoldsSite(rss)) {
00190 LOG("remotestreamsite", 4, "waiting for script...");
00191 ls.flushIncomingBuffers();
00192 }
00193 ls.setTimeoutOnSelect(oldtimeout);
00194 LOG("remotestreamsite", 4, "about to rss excise with count " << rss->getCount());
00195 rss->excise();
00196
00197 );
00198 #ifdef HAVE_POPEN
00199 if(gzip) pclose(gzip);
00200 #endif
00201 return true;
00202 } else {
00203 LOG("remotestreamsite", 2, "Cannot open script: " << strerror(errno) << " " << filename);
00204 return false;
00205 }
00206 }
00207
00208 bool RemoteStreamSite::isConnected()
00209 {
00210 return (isScript || writingFILE != 0);
00211 }