Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Namespace Members | Compound Members | File Members | Related Pages | Examples

vos/corelibs/vos/remotesocketsite.cc

Go to the documentation of this file.
00001 /*
00002     This file is part of the Virtual Object System of
00003     the Interreality project (http://interreality.org).
00004 
00005     Copyright (C) 2001-2003 Peter Amstutz
00006 
00007     This library is free software; you can redistribute it and/or
00008     modify it under the terms of the GNU Lesser General Public
00009     License as published by the Free Software Foundation; either
00010     version 2 of the License, or (at your option) any later version.
00011 
00012     This library is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015     Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public
00018     License along with this library; if not, write to the Free Software
00019     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00020 
00021     Peter Amstutz <tetron@interreality.org>
00022 */
00023 
00024 #include "remotesocketsite.hh"
00025 #include "message.hh"
00026 #include "messageblock.hh"
00027 #include "timer.hh"
00028 
00029 using namespace VOS;
00030 
00031 #include <stdio.h>
00032 #include <sys/types.h>
00033 #include <fcntl.h>
00034 #include <errno.h>
00035 
00036 #include <deque>
00037 #include <string>
00038 #include <map>
00039 
00040 /** @file
00041     Implements RemoteSocketSite.
00042 */
00043 
00044 #ifndef SOL_TCP
00045 #define SOL_TCP 6
00046 #endif
00047 
00048 #if !defined(HAVE_SOCKLEN_T) && !defined(__socklen_t_defined)
00049 //#warning socklen_t not defined. Defining it as int.
00050 typedef int socklen_t;
00051 #define __socklen_t_defined 1
00052 #endif
00053 
00054 #ifdef SOCKS_SUPPORT
00055 #undef __P
00056 #if defined (__STDC__) || defined (_AIX) \
00057         || (defined (__mips) && defined (_SYSTYPE_SVR4)) \
00058         || defined(WIN32) || defined(__cplusplus)
00059 # define __P(protos) protos
00060 #else
00061 # define __P(protos) ()
00062 #endif
00063 extern "C" {
00064     struct hostent *Rgethostbyname __P((const char *));
00065     int Rconnect __P((int, const struct sockaddr *, socklen_t));
00066     ssize_t Rrecv __P((int s, void *msg, size_t len, int flags));
00067     ssize_t Rsend __P((int s, const void *msg, size_t len, int flags));
00068 }
00069 #endif
00070 
00071 #if !defined(HAVE_SOCKLEN_T) && !defined(__socklen_t_defined)
00072 //#warning socklen_t not defined. Defining it as int.
00073 typedef int socklen_t;
00074 #define __socklen_t_defined 1
00075 #endif
00076 
00077 void RemoteSocketSite::init(const string& hostname, unsigned short int port) throw (SiteConnectionError)
00078 {
00079     remoteport=port;
00080 
00081     struct hostent *hp;
00082     struct sockaddr_in sa;
00083 
00084 #ifdef SOCKS_SUPPORT
00085     if(getenv("VOS_USE_SOCKS")) {
00086         if((hp=Rgethostbyname(hostname.c_str())) == 0) throw SiteConnectionError("DNS lookup failed");
00087         usingSOCKS = true;
00088     } else {
00089 #endif
00090 
00091         int err;
00092 #ifdef HAVE_GETIPNODEBYNAME
00093         hp = getipnodebyname(hostname.c_str(), AF_INET, 0, &err);
00094 #else
00095         hp = gethostbyname_locked(hostname.c_str());
00096     err = h_errno;
00097 #endif
00098         if(!hp) throw SiteConnectionError("DNS lookup failed");
00099 
00100 #ifdef SOCKS_SUPPORT
00101         usingSOCKS = false;
00102     }
00103 #endif
00104 
00105     memset(&sa, 0, sizeof(sa));
00106     sa.sin_port=htons(port);
00107     sa.sin_family=AF_INET;
00108     sa.sin_addr=*((struct in_addr *)hp->h_addr);
00109 
00110 #ifdef HAVE_GETIPNODEBYNAME
00111     freehostent(hp);
00112 #else
00113     freeHostEnt(hp);
00114 #endif
00115 
00116 #ifdef SOCKS_SUPPORT
00117     if(usingSOCKS) {
00118         if((readingFD = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
00119            Rconnect(readingFD, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) < 0)
00120         {
00121             readingFD = writingFD = -1;
00122             throw SiteConnectionError(strerror(errno));
00123         }
00124     } else {
00125 #endif
00126         if((readingFD = (int)socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
00127            connect(readingFD, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) < 0)
00128         {
00129             readingFD = writingFD = -1;
00130             throw SiteConnectionError(strerror(errno));
00131         }
00132 #ifdef SOCKS_SUPPORT
00133     }
00134 #endif
00135 
00136     writingFD = readingFD;
00137     int x = 1;
00138     setsockopt(writingFD, SOL_TCP, TCP_NODELAY, (char*)&x, sizeof(int));
00139 #ifdef WIN32
00140     unsigned long one = 1;
00141     ioctlsocket(writingFD, FIONBIO, &one);
00142 #else
00143     unsigned int one = fcntl(writingFD, F_GETFL);
00144     one = one | O_NONBLOCK;
00145     fcntl(writingFD, F_SETFL, one);
00146 #endif
00147     allOpenSockets.insert(readingFD); // need to lock this (?)
00148     char p[256];
00149 
00150     snprintf(p, sizeof(p), "%i", port);
00151     url.setPort(p);
00152     url.setHost(hostname);
00153     name = url.getHostAndPort();
00154 
00155     socklen_t st = sizeof(int);
00156 #ifdef WIN32
00157     getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, (char*)&sockSendBufSz, &st);
00158 #else
00159     getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, &sockSendBufSz, &st);
00160 #endif
00161     sockSendBufSz = sockSendBufSz / 2;
00162 
00163     addHostAlias(url.getHostAndPort());
00164     addSite(this);
00165 }
00166 
00167 RemoteSocketSite::RemoteSocketSite(const string& hostname, unsigned short int port) throw (SiteConnectionError)
00168     : VobjectImplementation(hostname, 0, false), RemoteMetaObject(hostname, 0),
00169       RemoteVobject(hostname, 0), MetaObject(0), ssl(0),
00170       partialMessage(new MessageBlock())
00171 {
00172     try {
00173         init(hostname, port);
00174     } catch(SiteConnectionError x) {
00175         throw;
00176     }
00177 }
00178 
00179 RemoteSocketSite::RemoteSocketSite(const string& hostname) throw (SiteConnectionError)
00180     : VobjectImplementation(hostname, 0, false), RemoteMetaObject(hostname, 0),
00181       RemoteVobject(hostname, 0), MetaObject(0), ssl(0),
00182       partialMessage(new MessageBlock())
00183 {
00184     try {
00185         init(hostname, VOS_DEFAULT_PORT);
00186     } catch(SiteConnectionError x) {
00187         throw;
00188     }
00189 }
00190 
00191 RemoteSocketSite::RemoteSocketSite(int fd, sockaddr_in* peername)
00192     : VobjectImplementation("fd", 0, false), RemoteMetaObject("fd", 0), RemoteVobject("fd", 0),
00193       MetaObject(0), readingFD(fd), writingFD(fd), ssl(0),
00194       partialMessage(new MessageBlock())
00195 {
00196     struct hostent* hp=0;
00197     if(getenv("VOS_DOREVERSELOOKUP")) {
00198         hp = gethostbyaddr((char*)&(peername->sin_addr.s_addr), 4, AF_INET);
00199     }
00200     char p[16];
00201     remoteport=ntohs(peername->sin_port);
00202     snprintf(p, sizeof(p), "%i", remoteport);
00203     if(hp) {
00204         url.setHost(string(hp->h_name));
00205     } else {
00206         char s[32];
00207 
00208         // Convert address from host byte order, then out in nice dotted format.
00209         // (becuase inet_ntoa() doesn't exist on all platforms)
00210         long addr = htonl(peername->sin_addr.s_addr);
00211         snprintf(s, sizeof(s), "%i.%i.%i.%i",
00212                 (int)(addr >> 24) & 0xFF,
00213                 (int)(addr >> 16) & 0xFF,
00214                 (int)(addr >> 8) & 0xFF,
00215                 (int)(addr & 0xFF) ) ;
00216 
00217         url.setHost(s);
00218     }
00219 
00220     socklen_t st = sizeof(int);
00221 #ifdef WIN32
00222     getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, (char*)&sockSendBufSz, &st);
00223 #else
00224     getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, &sockSendBufSz, &st);
00225 #endif
00226     sockSendBufSz = sockSendBufSz / 2;
00227 
00228     url.setPort(p);
00229     addHostAlias(url.getHostAndPort());
00230     addSite(this);
00231 }
00232 
00233 RemoteSocketSite::~RemoteSocketSite()
00234 {
00235     LOG("remotesocketsite", 5, "deleting " << getURL().getString());
00236 }
00237 
00238 void RemoteSocketSite::handleDisconnection()
00239 {
00240     LOG("remotesite", 3, "remote site disconnected: " << strerror(errno));
00241     LOG("refcount", 5, "handleDisconnection: remote site count is " << getCount());
00242 
00243     excise();
00244 }
00245 
00246 void RemoteSocketSite::excise()
00247 {
00248     LOG("remotesite", 4, "closing connection rfd " << readingFD << "  wrf " << writingFD);
00249 
00250     if(readingFD > -1) allOpenSockets.erase(readingFD);
00251 #ifdef HAVE_LIBWS2_32
00252     if(readingFD > -1) closesocket(readingFD);
00253     if(writingFD > -1) closesocket(writingFD);
00254 #else
00255     if(readingFD > -1) close(readingFD);
00256     if(writingFD > -1) close(writingFD);
00257 #endif
00258     readingFD = writingFD = -1;
00259 
00260     RemoteSite::excise();
00261 }
00262 
00263 void RemoteSocketSite::flushIncomingBuffers()
00264 {
00265     LOG("remotestreamsite", 5, "check baby, remote stream site " << this);
00266     if(readingFD < 0) return;
00267     do {
00268         /*fd_set fs;
00269         FD_ZERO(&fs);
00270         FD_SET(readingFD, &fs);
00271         struct timeval tv;
00272         tv.tv_sec = 0;
00273         tv.tv_usec = 0;
00274         select(readingFD+1, &fs, 0, 0, &tv);
00275         if(! FD_ISSET(readingFD, &fs)) break;*/
00276 
00277         char data[16384];
00278 
00279         LOG("remotesocketsite", 5, "about to go into readStream");
00280         int r=readStream(data, sizeof(data));               
00281         LOG("remotesocketsite", 4, "read stream got " << r);
00282         if(r == -1 || r == 0) {
00283 #ifdef WIN32
00284             int err = WSAGetLastError();
00285             if(!(err == WSAEWOULDBLOCK || err == WSAEINTR)) handleDisconnection();
00286 #else
00287             if(!(errno == EWOULDBLOCK || errno == EINTR)) handleDisconnection();
00288 #endif
00289             return;
00290         }
00291         string s((const char*)data, r);
00292         LOG("remotesocketsite", 5, "received " << s);
00293         int p=partialMessage->parseUpdate(s);
00294         LOG("remotesocketsite", 4, "parseUpdate returned " << p);
00295 
00296         vRef<Message> last = partialMessage->lastMessage();
00297         if(&last) {
00298             LOG("remotesocketsite", 4, "saw msg with nonce '" << last->getNonce() << "'");
00299             if(last->getNonce() != "" && waitingFor.count(last->getNonce())) {
00300                 LOG("remotesocketsite", 4, "caching msg with nonce " << last->getNonce());
00301                 waitingFor[last->getNonce()].first = true;
00302             }
00303         }
00304         while(p > 0) {
00305             static int incoming_count = 1;
00306             last->incoming_debug = incoming_count++;
00307             if(last->getNonce() != "" && waitingFor.count(last->getNonce())) {
00308                 last->acquire(); // released by: clearCaughtMessage()
00309                 waitingFor[last->getNonce()].second = &last;
00310             }
00311             localpeer->scheduleMessageBlock(partialMessage, 0, this);
00312             partialMessage->release(); // acquired by: new MessageBlock() above or below
00313             s.erase(s.begin(), s.begin() + p);
00314             partialMessage = new MessageBlock(); // released by: completed message (above) or destructor
00315             if(s.size() > 0) {
00316                 p = partialMessage->parseUpdate(s);
00317                 last = partialMessage->lastMessage();
00318                 if(&last) {
00319                     LOG("remotestreamsite", 4, "saw msg with nonce '" << last->getNonce() << "'");
00320                     if(last->getNonce() != "" && waitingFor.count(last->getNonce())) {
00321                         LOG("remotestreamsite", 4, "caching msg with nonce " << last->getNonce());
00322                         waitingFor[last->getNonce()].first = true;
00323                     }
00324                 }
00325             } else break;
00326         }
00327     } while(1);
00328 }
00329 
00330 void RemoteSocketSite::flushOutgoingBuffers(const char* newdata, unsigned int newsize)
00331 {
00332     if(outputBuffer.size() == 0 && newsize == 0) return;
00333 
00334     const char* data;
00335     unsigned int datasize;
00336     bool buffered;
00337     if(outputBuffer.size() == 0) {
00338         data = newdata;
00339         datasize = newsize;
00340         buffered = false;
00341     } else {
00342         if(newsize > 0) outputBuffer.append(newdata, newsize);
00343         data = outputBuffer.c_str();
00344         datasize = outputBuffer.size();
00345         buffered = true;
00346     }
00347 
00348     LocalSocketSite* lss = dynamic_cast<LocalSocketSite*>(localpeer);
00349 
00350     int total = 0;
00351     do {
00352         int sendsize = (datasize - total) < sockSendBufSz ? (datasize - total) : sockSendBufSz;
00353         int ret = writeStream(data + total, sendsize);
00354         if(ret == -1) {
00355 #ifdef WIN32
00356             int err = WSAGetLastError();
00357             if(err == WSAEWOULDBLOCK || err == WSAEINTR) {   
00358 #else
00359             if(errno == EWOULDBLOCK || errno == EINTR) {
00360 #endif
00361                 if(buffered) outputBuffer.replace(0, total, "");
00362                 else outputBuffer.append(data + total, datasize - total);
00363                 lss->addFDtoSelect(writingFD, LocalSocketSite::SELECTWRITE);
00364                 return;
00365             } else throw SiteConnectionError("disconnected");
00366         } else total += ret;
00367     } while(total < datasize);
00368 
00369     if(buffered) outputBuffer.replace(0, total, "");
00370     if(outputBuffer.size() == 0) {
00371         lss->removeFDfromSelect(writingFD, LocalSocketSite::SELECTWRITE);
00372     }
00373 }
00374 
00375 #ifndef MSG_NOSIGNAL
00376 #define MSG_NOSIGNAL 0
00377 #endif
00378 
00379 int RemoteSocketSite::readStream(char* data, unsigned int datasize)
00380 {
00381 #ifdef SSL_SUPPORT
00382     if(ssl) {
00383         int ret = SSL_read(ssl, data, datasize);
00384         if(ret <= 0) {
00385             if(ERR_get_error() == SSL_ERROR_WANT_READ) {
00386                 errno = EAGAIN;
00387             } else {
00388                 LOG("remotestreamsite", 2, "SSL error " << ERR_error_string(ERR_get_error(), 0));
00389             }
00390         }
00391         return ret;
00392     } else {
00393 #endif
00394 #ifdef SOCKS_SUPPORT
00395         if(usingSOCKS) {
00396             return Rrecv(readingFD, data, datasize, MSG_NOSIGNAL);
00397         } else {
00398 #endif
00399             return recv(readingFD, data, datasize, MSG_NOSIGNAL);
00400 #ifdef SOCKS_SUPPORT
00401         }
00402 #endif
00403 #ifdef SSL_SUPPORT
00404     }
00405 #endif
00406 }
00407 
00408 int RemoteSocketSite::writeStream(const char* data, unsigned int datasize)
00409 {
00410 #ifdef SSL_SUPPORT
00411     if(ssl) {
00412         int ret = SSL_write(ssl, data, datasize);
00413         if(ret <= 0) {
00414             if(ERR_get_error() == SSL_ERROR_WANT_WRITE) {
00415                 errno = EAGAIN;
00416             } else {
00417                 LOG("remotestreamsite", 2, "SSL error " << ERR_error_string(ERR_get_error(), 0));
00418             }
00419         }
00420         return ret;
00421     } else {
00422 #endif
00423 #ifdef SOCKS_SUPPORT
00424         if(usingSOCKS) {
00425             return Rsend(writingFD, data, datasize, MSG_NOSIGNAL);
00426         } else {
00427 #endif
00428             return send(writingFD, data, datasize, MSG_NOSIGNAL);
00429 #ifdef SOCKS_SUPPORT
00430         }
00431 #endif
00432 #ifdef SSL_SUPPORT
00433     }
00434 #endif
00435 }
00436 
00437 void RemoteSocketSite::secureConnection(const string& protocol)
00438 {
00439 #ifdef SSL_SUPPORT
00440     if(ssl) throw RemoteError("A secure connection has already been established\n");
00441 
00442     SSL_CTX* ctx = meta_cast<LocalSocketSite*>(localpeer)->getSSLContext(protocol);
00443 
00444     pREF(Message*, m, new Message(),
00445          rREF(LocalSite&, ls, initFields(this, m, "core:protocol-switch", true), );
00446          m->insertField(-1, "protocol", protocol);
00447          sendMessage(m);
00448 
00449          suppressOutgoing(true);
00450          try {
00451              pREF(Message*, r, localpeer->waitFor(m->getNonce(), this),
00452                   if(r->getField("result").value == "ready") {
00453                       ssl = SSL_new(ctx);
00454                       if(! ssl) {
00455                           throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00456                       }
00457 
00458                       int ret = SSL_set_wfd(ssl, writingFD);
00459                       if(ret != 1) {
00460                           throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00461                       }
00462 
00463                       ret = SSL_set_rfd(ssl, readingFD);
00464                       if(ret != 1) {
00465                           throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00466                       }
00467 
00468                       ret = SSL_connect(ssl);
00469                       if(ret != 1) {
00470                           throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00471                       }
00472 
00473                       enableOutgoing(true);
00474                   } else {
00475                       enableOutgoing(true);
00476                       throw RemoteError(r->getField("result").value);
00477                   }
00478                  );
00479          } catch(Message::NoSuchFieldError) {
00480              enableOutgoing(true);
00481              throw RemoteError("reply did not contain a result field");
00482          } catch(...) {
00483              enableOutgoing(true);
00484              throw;
00485          }
00486         );
00487 #else
00488     throw RemoteError("libvos was not compiled with SSL/TLS support");
00489 #endif
00490 }
00491 
00492 void RemoteSocketSite::switchProtocol(SSL_CTX* context)
00493 {
00494 #ifdef SSL_SUPPORT
00495     ssl = SSL_new(context);
00496     if(! ssl) {
00497         throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00498     }
00499 
00500     int ret = SSL_set_wfd(ssl, writingFD);
00501     if(ret != 1) {
00502         throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00503     }
00504 
00505     ret = SSL_set_rfd(ssl, readingFD);
00506     if(ret != 1) {
00507         throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00508     }
00509 
00510     ret = SSL_accept(ssl);
00511     if(ret != 1) {
00512         throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00513     }
00514 #else
00515     throw RemoteError("libvos was not compiled with SSL/TLS support");
00516 #endif
00517 }
00518 
00519 X509* RemoteSocketSite::getCertificate()
00520 {
00521 #ifdef SSL_SUPPORT
00522     if(ssl) return SSL_get_peer_certificate(ssl);
00523     else throw RemoteError("No secure session established");
00524 #else
00525     throw RemoteError("libvos was not compiled with SSL/TLS support");
00526 #endif
00527 }
00528 
00529 bool RemoteSocketSite::isConnected()
00530 {
00531     return (readingFD > -1);
00532 }
00533 
00534 bool OutgoingLock::checkMessage(Message* m)
00535 {
00536     return (m->getMethod() == "core:anti-spoof-check"
00537              || m->getMethod() == "core:anti-spoof-reply"
00538              || m->getMethod() == "core:error");
00539 }
00540 
00541 bool EnqueueOutgoing::checkMessage(Message* m)
00542 {
00543     if(m->getMethod() == "core:anti-spoof-check"
00544         || m->getMethod() == "core:anti-spoof-reply"
00545         || m->getMethod() == "core:error")
00546     {
00547         return true;
00548     } else {
00549         m->acquire();
00550         outgoingQueue.push(pair<Message*, MessageBlock*>(m, 0));
00551         return false;
00552     }
00553 }
00554 
00555 bool EnqueueOutgoing::checkMessage(MessageBlock* m)
00556 {
00557     m->acquire();  // released by: enableOutgoing()
00558     outgoingQueue.push(pair<Message*, MessageBlock*>(0, m));
00559     return true;
00560 }
00561 
00562 bool RewriteTo::checkMessage(Message* m)
00563 {
00564     URL to(m->getTo());
00565     to.setHostAndPort(hostandport);
00566     m->setTo(to.getString());
00567     return true;
00568 }
00569 
00570 void RemoteSocketSite::suppressOutgoing(bool enqueue)
00571 {
00572     if(enqueue) {
00573         insertMessageFilter(0, &enqueueOutgoing);
00574     } else {
00575         insertMessageFilter(-1, &outgoinglock);
00576     }
00577 }
00578 
00579 void RemoteSocketSite::enableOutgoing(bool sendqueue, const string& rewriteTo)
00580 {
00581     removeMessageFilter(&outgoinglock);
00582     removeMessageFilter(&enqueueOutgoing);
00583 
00584     RewriteTo rt(rewriteTo);
00585     if(rewriteTo != "?") {
00586         insertMessageFilter(-1, &rt);
00587     }
00588 
00589     while(!enqueueOutgoing.outgoingQueue.empty()) {
00590         if(enqueueOutgoing.outgoingQueue.front().first) {
00591             if(sendqueue) sendMessage(enqueueOutgoing.outgoingQueue.front().first);
00592             enqueueOutgoing.outgoingQueue.front().first->release();  // acquired by: sendMessage()
00593         }
00594         if(enqueueOutgoing.outgoingQueue.front().second) {
00595             if(sendqueue) sendMessage(enqueueOutgoing.outgoingQueue.front().second);
00596             enqueueOutgoing.outgoingQueue.front().second->release();  // acquired by: sendMessage()
00597         }
00598         enqueueOutgoing.outgoingQueue.pop();
00599     }
00600 
00601     removeMessageFilter(&rt);
00602 }
00603 
00604 void RemoteSocketSite::sendMessage(Message* m)
00605 {
00606     for(unsigned int i = 0; i < messagefilters.size(); i++) {
00607         if(! messagefilters[i]->checkMessage(m)) {
00608             LOG("remotesocketsite", 3, "Not sending filtered message");
00609             return;
00610         }
00611     }
00612 
00613     Site::sendMessage(m);
00614     LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString()
00615         << " [" << m->refcount_debug
00616         << "/" << m->getCount() <<  "]\n" << m->getFormattedString().substr(0, 768));
00617     LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00618 
00619     if(writingFD > -1) {
00620         const string& s = m->getFormattedString();
00621         flushOutgoingBuffers(s.c_str(), (unsigned int)s.size());
00622     } else throw SiteConnectionError("Tried to send a message to a disconnected site");
00623 }
00624 
00625 void RemoteSocketSite::sendMessage(MessageBlock* m)
00626 {
00627     for(unsigned int i = 0; i < messagefilters.size(); i++) {
00628         if(! messagefilters[i]->checkMessage(m)) {
00629             return;
00630         }
00631     }
00632 
00633     if(! m->numMessages()) return;
00634 
00635     Site::sendMessage(m);
00636     LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString() << "\n" << m->getString());
00637     LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00638 
00639     if(writingFD > -1) {
00640         const string& s = m->getString();
00641         flushOutgoingBuffers(s.c_str(), (unsigned int)s.size());
00642     } else throw SiteConnectionError("Tried to send message to disconnected site");
00643 }
00644 
00645 /*
00646 void RemoteSocketSite::takeOver(RemoteSocketSite* rs)
00647 {
00648     readingFD = rs->readingFD;
00649     rs->readingFD = rs->writingFD = -1;
00650     rs->handleDisconnection();
00651     localpeer->takeOverMessages(rs, this);
00652 }
00653 */
00654 
00655 
00656 AsyncConnect::AsyncConnect(bool* isdone, RemoteSocketSite** rs, LocalSite* ls,
00657                            const string& host, unsigned short int port, bool isspooftest)
00658     : done(isdone), remotesite(rs), localpeer(ls), remotehost(host), remoteport(port), spooftest(isspooftest)
00659 {
00660     *done = false;
00661     *remotesite = 0;
00662 }
00663 
00664 RemoteSocketSite* AsyncConnect::connect(const string& h, unsigned short int p, LocalSite* defaultPeer, bool isspooftest)
00665 {
00666     RemoteSocketSite* rss;
00667     bool isdone;
00668 
00669     AsyncConnect ac(&isdone, &rss, defaultPeer, h, p, isspooftest);
00670 
00671     boost::thread connectthread(ac);
00672     double timeout = defaultPeer->getTimeoutOnSelect();
00673 
00674     if(timeout == -1 || timeout > 1) defaultPeer->setTimeoutOnSelect(.2);
00675     while(!isdone) {
00676         defaultPeer->flushIncomingBuffers();
00677     }
00678 
00679     connectthread.join();
00680 
00681     defaultPeer->setTimeoutOnSelect(timeout);
00682 
00683     if(rss == 0) {
00684         throw RemoteSite::SiteConnectionError(ac.exstring);
00685     }
00686 
00687     double start = getRealTime();
00688     while(rss->isConnected() && !rss->getGreeted() && (getRealTime() - start) < VOS_DEFAULT_TIMEOUT) {
00689         defaultPeer->flushIncomingBuffers();
00690     }
00691 
00692     if(!rss->isConnected() || !rss->getGreeted()) {
00693         rss->handleDisconnection();
00694         rss->release();
00695         throw RemoteSite::SiteConnectionError("Remote site disconnected or did not greet us");
00696     }
00697 
00698     return rss;
00699 }
00700 
00701 // This runs in a separate thread
00702 void AsyncConnect::operator()()
00703 {
00704     try {
00705         LOG("AsyncConnect", 3, "ready to start connecting to " << remotehost << ":" << remoteport);
00706         *remotesite = new RemoteSocketSite(remotehost, remoteport);
00707 
00708         if(spooftest) (*remotesite)->addFlag("spoof-test");
00709 
00710         if(getenv("VOS_HOSTNAME") == 0) {
00711             unsigned char ipaddr[4];
00712             string host = Site::detectHostname((*remotesite)->getReadingFD(), ipaddr);
00713             LOG("AsyncConnect", 3, "detected IP is "
00714                 << (unsigned int)ipaddr[0] << "." << (unsigned int)ipaddr[1] << "."
00715                 << (unsigned int)ipaddr[2] << "." << (unsigned int)ipaddr[3]);
00716             URL u(localpeer->getURL());
00717             u.setHost(host);
00718             localpeer->addHostAlias(u.getHostAndPort());
00719             if(!(ipaddr[0] == 10 || ipaddr[0] == 127 || (ipaddr[0] == 192 && ipaddr[1] == 168))) {
00720                 localpeer->setPrimaryHostname(host);
00721             }
00722         }
00723         Site::doSitePeering(localpeer, *remotesite, spooftest, false);
00724     } catch(runtime_error e) {
00725         exstring = e.what();
00726         LOG("remotesocketsite", 2, "Caught exception " << exstring);
00727     }
00728     *done = true;
00729     LOG("AsyncConnect", 3, "done trying to connect to " << remotehost << ":" << remoteport);
00730 }

Generated on Tue Aug 12 03:55:41 2003 for Interreality Project - VOS by doxygen 1.3.2