00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "remotesocketsite.hh"
00025 #include "message.hh"
00026 #include "messageblock.hh"
00027 #include "timer.hh"
00028
00029 using namespace VOS;
00030
00031 #include <stdio.h>
00032 #include <sys/types.h>
00033 #include <fcntl.h>
00034 #include <errno.h>
00035
00036 #include <deque>
00037 #include <string>
00038 #include <map>
00039
00040
00041
00042
00043
00044 #ifndef SOL_TCP
00045 #define SOL_TCP 6
00046 #endif
00047
00048 #if !defined(HAVE_SOCKLEN_T) && !defined(__socklen_t_defined)
00049
00050 typedef int socklen_t;
00051 #define __socklen_t_defined 1
00052 #endif
00053
00054 #ifdef SOCKS_SUPPORT
00055 #undef __P
00056 #if defined (__STDC__) || defined (_AIX) \
00057 || (defined (__mips) && defined (_SYSTYPE_SVR4)) \
00058 || defined(WIN32) || defined(__cplusplus)
00059 # define __P(protos) protos
00060 #else
00061 # define __P(protos) ()
00062 #endif
00063 extern "C" {
00064 struct hostent *Rgethostbyname __P((const char *));
00065 int Rconnect __P((int, const struct sockaddr *, socklen_t));
00066 ssize_t Rrecv __P((int s, void *msg, size_t len, int flags));
00067 ssize_t Rsend __P((int s, const void *msg, size_t len, int flags));
00068 }
00069 #endif
00070
00071 #if !defined(HAVE_SOCKLEN_T) && !defined(__socklen_t_defined)
00072
00073 typedef int socklen_t;
00074 #define __socklen_t_defined 1
00075 #endif
00076
00077 void RemoteSocketSite::init(const string& hostname, unsigned short int port) throw (SiteConnectionError)
00078 {
00079 remoteport=port;
00080
00081 struct hostent *hp;
00082 struct sockaddr_in sa;
00083
00084 #ifdef SOCKS_SUPPORT
00085 if(getenv("VOS_USE_SOCKS")) {
00086 if((hp=Rgethostbyname(hostname.c_str())) == 0) throw SiteConnectionError("DNS lookup failed");
00087 usingSOCKS = true;
00088 } else {
00089 #endif
00090
00091 int err;
00092 #ifdef HAVE_GETIPNODEBYNAME
00093 hp = getipnodebyname(hostname.c_str(), AF_INET, 0, &err);
00094 #else
00095 hp = gethostbyname_locked(hostname.c_str());
00096 err = h_errno;
00097 #endif
00098 if(!hp) throw SiteConnectionError("DNS lookup failed");
00099
00100 #ifdef SOCKS_SUPPORT
00101 usingSOCKS = false;
00102 }
00103 #endif
00104
00105 memset(&sa, 0, sizeof(sa));
00106 sa.sin_port=htons(port);
00107 sa.sin_family=AF_INET;
00108 sa.sin_addr=*((struct in_addr *)hp->h_addr);
00109
00110 #ifdef HAVE_GETIPNODEBYNAME
00111 freehostent(hp);
00112 #else
00113 freeHostEnt(hp);
00114 #endif
00115
00116 #ifdef SOCKS_SUPPORT
00117 if(usingSOCKS) {
00118 if((readingFD = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
00119 Rconnect(readingFD, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) < 0)
00120 {
00121 readingFD = writingFD = -1;
00122 throw SiteConnectionError(strerror(errno));
00123 }
00124 } else {
00125 #endif
00126 if((readingFD = (int)socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
00127 connect(readingFD, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) < 0)
00128 {
00129 readingFD = writingFD = -1;
00130 throw SiteConnectionError(strerror(errno));
00131 }
00132 #ifdef SOCKS_SUPPORT
00133 }
00134 #endif
00135
00136 writingFD = readingFD;
00137 int x = 1;
00138 setsockopt(writingFD, SOL_TCP, TCP_NODELAY, (char*)&x, sizeof(int));
00139 #ifdef WIN32
00140 unsigned long one = 1;
00141 ioctlsocket(writingFD, FIONBIO, &one);
00142 #else
00143 unsigned int one = fcntl(writingFD, F_GETFL);
00144 one = one | O_NONBLOCK;
00145 fcntl(writingFD, F_SETFL, one);
00146 #endif
00147 allOpenSockets.insert(readingFD);
00148 char p[256];
00149
00150 snprintf(p, sizeof(p), "%i", port);
00151 url.setPort(p);
00152 url.setHost(hostname);
00153 name = url.getHostAndPort();
00154
00155 socklen_t st = sizeof(int);
00156 #ifdef WIN32
00157 getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, (char*)&sockSendBufSz, &st);
00158 #else
00159 getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, &sockSendBufSz, &st);
00160 #endif
00161 sockSendBufSz = sockSendBufSz / 2;
00162
00163 addHostAlias(url.getHostAndPort());
00164 addSite(this);
00165 }
00166
00167 RemoteSocketSite::RemoteSocketSite(const string& hostname, unsigned short int port) throw (SiteConnectionError)
00168 : VobjectImplementation(hostname, 0, false), RemoteMetaObject(hostname, 0),
00169 RemoteVobject(hostname, 0), MetaObject(0), ssl(0),
00170 partialMessage(new MessageBlock())
00171 {
00172 try {
00173 init(hostname, port);
00174 } catch(SiteConnectionError x) {
00175 throw;
00176 }
00177 }
00178
00179 RemoteSocketSite::RemoteSocketSite(const string& hostname) throw (SiteConnectionError)
00180 : VobjectImplementation(hostname, 0, false), RemoteMetaObject(hostname, 0),
00181 RemoteVobject(hostname, 0), MetaObject(0), ssl(0),
00182 partialMessage(new MessageBlock())
00183 {
00184 try {
00185 init(hostname, VOS_DEFAULT_PORT);
00186 } catch(SiteConnectionError x) {
00187 throw;
00188 }
00189 }
00190
00191 RemoteSocketSite::RemoteSocketSite(int fd, sockaddr_in* peername)
00192 : VobjectImplementation("fd", 0, false), RemoteMetaObject("fd", 0), RemoteVobject("fd", 0),
00193 MetaObject(0), readingFD(fd), writingFD(fd), ssl(0),
00194 partialMessage(new MessageBlock())
00195 {
00196 struct hostent* hp=0;
00197 if(getenv("VOS_DOREVERSELOOKUP")) {
00198 hp = gethostbyaddr((char*)&(peername->sin_addr.s_addr), 4, AF_INET);
00199 }
00200 char p[16];
00201 remoteport=ntohs(peername->sin_port);
00202 snprintf(p, sizeof(p), "%i", remoteport);
00203 if(hp) {
00204 url.setHost(string(hp->h_name));
00205 } else {
00206 char s[32];
00207
00208
00209
00210 long addr = htonl(peername->sin_addr.s_addr);
00211 snprintf(s, sizeof(s), "%i.%i.%i.%i",
00212 (int)(addr >> 24) & 0xFF,
00213 (int)(addr >> 16) & 0xFF,
00214 (int)(addr >> 8) & 0xFF,
00215 (int)(addr & 0xFF) ) ;
00216
00217 url.setHost(s);
00218 }
00219
00220 socklen_t st = sizeof(int);
00221 #ifdef WIN32
00222 getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, (char*)&sockSendBufSz, &st);
00223 #else
00224 getsockopt(writingFD, SOL_SOCKET, SO_SNDBUF, &sockSendBufSz, &st);
00225 #endif
00226 sockSendBufSz = sockSendBufSz / 2;
00227
00228 url.setPort(p);
00229 addHostAlias(url.getHostAndPort());
00230 addSite(this);
00231 }
00232
00233 RemoteSocketSite::~RemoteSocketSite()
00234 {
00235 LOG("remotesocketsite", 5, "deleting " << getURL().getString());
00236 }
00237
00238 void RemoteSocketSite::handleDisconnection()
00239 {
00240 LOG("remotesite", 3, "remote site disconnected: " << strerror(errno));
00241 LOG("refcount", 5, "handleDisconnection: remote site count is " << getCount());
00242
00243 excise();
00244 }
00245
00246 void RemoteSocketSite::excise()
00247 {
00248 LOG("remotesite", 4, "closing connection rfd " << readingFD << " wrf " << writingFD);
00249
00250 if(readingFD > -1) allOpenSockets.erase(readingFD);
00251 #ifdef HAVE_LIBWS2_32
00252 if(readingFD > -1) closesocket(readingFD);
00253 if(writingFD > -1) closesocket(writingFD);
00254 #else
00255 if(readingFD > -1) close(readingFD);
00256 if(writingFD > -1) close(writingFD);
00257 #endif
00258 readingFD = writingFD = -1;
00259
00260 RemoteSite::excise();
00261 }
00262
00263 void RemoteSocketSite::flushIncomingBuffers()
00264 {
00265 LOG("remotestreamsite", 5, "check baby, remote stream site " << this);
00266 if(readingFD < 0) return;
00267 do {
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277 char data[16384];
00278
00279 LOG("remotesocketsite", 5, "about to go into readStream");
00280 int r=readStream(data, sizeof(data));
00281 LOG("remotesocketsite", 4, "read stream got " << r);
00282 if(r == -1 || r == 0) {
00283 #ifdef WIN32
00284 int err = WSAGetLastError();
00285 if(!(err == WSAEWOULDBLOCK || err == WSAEINTR)) handleDisconnection();
00286 #else
00287 if(!(errno == EWOULDBLOCK || errno == EINTR)) handleDisconnection();
00288 #endif
00289 return;
00290 }
00291 string s((const char*)data, r);
00292 LOG("remotesocketsite", 5, "received " << s);
00293 int p=partialMessage->parseUpdate(s);
00294 LOG("remotesocketsite", 4, "parseUpdate returned " << p);
00295
00296 vRef<Message> last = partialMessage->lastMessage();
00297 if(&last) {
00298 LOG("remotesocketsite", 4, "saw msg with nonce '" << last->getNonce() << "'");
00299 if(last->getNonce() != "" && waitingFor.count(last->getNonce())) {
00300 LOG("remotesocketsite", 4, "caching msg with nonce " << last->getNonce());
00301 waitingFor[last->getNonce()].first = true;
00302 }
00303 }
00304 while(p > 0) {
00305 static int incoming_count = 1;
00306 last->incoming_debug = incoming_count++;
00307 if(last->getNonce() != "" && waitingFor.count(last->getNonce())) {
00308 last->acquire();
00309 waitingFor[last->getNonce()].second = &last;
00310 }
00311 localpeer->scheduleMessageBlock(partialMessage, 0, this);
00312 partialMessage->release();
00313 s.erase(s.begin(), s.begin() + p);
00314 partialMessage = new MessageBlock();
00315 if(s.size() > 0) {
00316 p = partialMessage->parseUpdate(s);
00317 last = partialMessage->lastMessage();
00318 if(&last) {
00319 LOG("remotestreamsite", 4, "saw msg with nonce '" << last->getNonce() << "'");
00320 if(last->getNonce() != "" && waitingFor.count(last->getNonce())) {
00321 LOG("remotestreamsite", 4, "caching msg with nonce " << last->getNonce());
00322 waitingFor[last->getNonce()].first = true;
00323 }
00324 }
00325 } else break;
00326 }
00327 } while(1);
00328 }
00329
00330 void RemoteSocketSite::flushOutgoingBuffers(const char* newdata, unsigned int newsize)
00331 {
00332 if(outputBuffer.size() == 0 && newsize == 0) return;
00333
00334 const char* data;
00335 unsigned int datasize;
00336 bool buffered;
00337 if(outputBuffer.size() == 0) {
00338 data = newdata;
00339 datasize = newsize;
00340 buffered = false;
00341 } else {
00342 if(newsize > 0) outputBuffer.append(newdata, newsize);
00343 data = outputBuffer.c_str();
00344 datasize = outputBuffer.size();
00345 buffered = true;
00346 }
00347
00348 LocalSocketSite* lss = dynamic_cast<LocalSocketSite*>(localpeer);
00349
00350 int total = 0;
00351 do {
00352 int sendsize = (datasize - total) < sockSendBufSz ? (datasize - total) : sockSendBufSz;
00353 int ret = writeStream(data + total, sendsize);
00354 if(ret == -1) {
00355 #ifdef WIN32
00356 int err = WSAGetLastError();
00357 if(err == WSAEWOULDBLOCK || err == WSAEINTR) {
00358 #else
00359 if(errno == EWOULDBLOCK || errno == EINTR) {
00360 #endif
00361 if(buffered) outputBuffer.replace(0, total, "");
00362 else outputBuffer.append(data + total, datasize - total);
00363 lss->addFDtoSelect(writingFD, LocalSocketSite::SELECTWRITE);
00364 return;
00365 } else throw SiteConnectionError("disconnected");
00366 } else total += ret;
00367 } while(total < datasize);
00368
00369 if(buffered) outputBuffer.replace(0, total, "");
00370 if(outputBuffer.size() == 0) {
00371 lss->removeFDfromSelect(writingFD, LocalSocketSite::SELECTWRITE);
00372 }
00373 }
00374
00375 #ifndef MSG_NOSIGNAL
00376 #define MSG_NOSIGNAL 0
00377 #endif
00378
00379 int RemoteSocketSite::readStream(char* data, unsigned int datasize)
00380 {
00381 #ifdef SSL_SUPPORT
00382 if(ssl) {
00383 int ret = SSL_read(ssl, data, datasize);
00384 if(ret <= 0) {
00385 if(ERR_get_error() == SSL_ERROR_WANT_READ) {
00386 errno = EAGAIN;
00387 } else {
00388 LOG("remotestreamsite", 2, "SSL error " << ERR_error_string(ERR_get_error(), 0));
00389 }
00390 }
00391 return ret;
00392 } else {
00393 #endif
00394 #ifdef SOCKS_SUPPORT
00395 if(usingSOCKS) {
00396 return Rrecv(readingFD, data, datasize, MSG_NOSIGNAL);
00397 } else {
00398 #endif
00399 return recv(readingFD, data, datasize, MSG_NOSIGNAL);
00400 #ifdef SOCKS_SUPPORT
00401 }
00402 #endif
00403 #ifdef SSL_SUPPORT
00404 }
00405 #endif
00406 }
00407
00408 int RemoteSocketSite::writeStream(const char* data, unsigned int datasize)
00409 {
00410 #ifdef SSL_SUPPORT
00411 if(ssl) {
00412 int ret = SSL_write(ssl, data, datasize);
00413 if(ret <= 0) {
00414 if(ERR_get_error() == SSL_ERROR_WANT_WRITE) {
00415 errno = EAGAIN;
00416 } else {
00417 LOG("remotestreamsite", 2, "SSL error " << ERR_error_string(ERR_get_error(), 0));
00418 }
00419 }
00420 return ret;
00421 } else {
00422 #endif
00423 #ifdef SOCKS_SUPPORT
00424 if(usingSOCKS) {
00425 return Rsend(writingFD, data, datasize, MSG_NOSIGNAL);
00426 } else {
00427 #endif
00428 return send(writingFD, data, datasize, MSG_NOSIGNAL);
00429 #ifdef SOCKS_SUPPORT
00430 }
00431 #endif
00432 #ifdef SSL_SUPPORT
00433 }
00434 #endif
00435 }
00436
00437 void RemoteSocketSite::secureConnection(const string& protocol)
00438 {
00439 #ifdef SSL_SUPPORT
00440 if(ssl) throw RemoteError("A secure connection has already been established\n");
00441
00442 SSL_CTX* ctx = meta_cast<LocalSocketSite*>(localpeer)->getSSLContext(protocol);
00443
00444 pREF(Message*, m, new Message(),
00445 rREF(LocalSite&, ls, initFields(this, m, "core:protocol-switch", true), );
00446 m->insertField(-1, "protocol", protocol);
00447 sendMessage(m);
00448
00449 suppressOutgoing(true);
00450 try {
00451 pREF(Message*, r, localpeer->waitFor(m->getNonce(), this),
00452 if(r->getField("result").value == "ready") {
00453 ssl = SSL_new(ctx);
00454 if(! ssl) {
00455 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00456 }
00457
00458 int ret = SSL_set_wfd(ssl, writingFD);
00459 if(ret != 1) {
00460 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00461 }
00462
00463 ret = SSL_set_rfd(ssl, readingFD);
00464 if(ret != 1) {
00465 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00466 }
00467
00468 ret = SSL_connect(ssl);
00469 if(ret != 1) {
00470 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00471 }
00472
00473 enableOutgoing(true);
00474 } else {
00475 enableOutgoing(true);
00476 throw RemoteError(r->getField("result").value);
00477 }
00478 );
00479 } catch(Message::NoSuchFieldError) {
00480 enableOutgoing(true);
00481 throw RemoteError("reply did not contain a result field");
00482 } catch(...) {
00483 enableOutgoing(true);
00484 throw;
00485 }
00486 );
00487 #else
00488 throw RemoteError("libvos was not compiled with SSL/TLS support");
00489 #endif
00490 }
00491
00492 void RemoteSocketSite::switchProtocol(SSL_CTX* context)
00493 {
00494 #ifdef SSL_SUPPORT
00495 ssl = SSL_new(context);
00496 if(! ssl) {
00497 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00498 }
00499
00500 int ret = SSL_set_wfd(ssl, writingFD);
00501 if(ret != 1) {
00502 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00503 }
00504
00505 ret = SSL_set_rfd(ssl, readingFD);
00506 if(ret != 1) {
00507 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00508 }
00509
00510 ret = SSL_accept(ssl);
00511 if(ret != 1) {
00512 throw LocalSocketSite::SSLError(ERR_error_string(ERR_get_error(), 0));
00513 }
00514 #else
00515 throw RemoteError("libvos was not compiled with SSL/TLS support");
00516 #endif
00517 }
00518
00519 X509* RemoteSocketSite::getCertificate()
00520 {
00521 #ifdef SSL_SUPPORT
00522 if(ssl) return SSL_get_peer_certificate(ssl);
00523 else throw RemoteError("No secure session established");
00524 #else
00525 throw RemoteError("libvos was not compiled with SSL/TLS support");
00526 #endif
00527 }
00528
00529 bool RemoteSocketSite::isConnected()
00530 {
00531 return (readingFD > -1);
00532 }
00533
00534 bool OutgoingLock::checkMessage(Message* m)
00535 {
00536 return (m->getMethod() == "core:anti-spoof-check"
00537 || m->getMethod() == "core:anti-spoof-reply"
00538 || m->getMethod() == "core:error");
00539 }
00540
00541 bool EnqueueOutgoing::checkMessage(Message* m)
00542 {
00543 if(m->getMethod() == "core:anti-spoof-check"
00544 || m->getMethod() == "core:anti-spoof-reply"
00545 || m->getMethod() == "core:error")
00546 {
00547 return true;
00548 } else {
00549 m->acquire();
00550 outgoingQueue.push(pair<Message*, MessageBlock*>(m, 0));
00551 return false;
00552 }
00553 }
00554
00555 bool EnqueueOutgoing::checkMessage(MessageBlock* m)
00556 {
00557 m->acquire();
00558 outgoingQueue.push(pair<Message*, MessageBlock*>(0, m));
00559 return true;
00560 }
00561
00562 bool RewriteTo::checkMessage(Message* m)
00563 {
00564 URL to(m->getTo());
00565 to.setHostAndPort(hostandport);
00566 m->setTo(to.getString());
00567 return true;
00568 }
00569
00570 void RemoteSocketSite::suppressOutgoing(bool enqueue)
00571 {
00572 if(enqueue) {
00573 insertMessageFilter(0, &enqueueOutgoing);
00574 } else {
00575 insertMessageFilter(-1, &outgoinglock);
00576 }
00577 }
00578
00579 void RemoteSocketSite::enableOutgoing(bool sendqueue, const string& rewriteTo)
00580 {
00581 removeMessageFilter(&outgoinglock);
00582 removeMessageFilter(&enqueueOutgoing);
00583
00584 RewriteTo rt(rewriteTo);
00585 if(rewriteTo != "?") {
00586 insertMessageFilter(-1, &rt);
00587 }
00588
00589 while(!enqueueOutgoing.outgoingQueue.empty()) {
00590 if(enqueueOutgoing.outgoingQueue.front().first) {
00591 if(sendqueue) sendMessage(enqueueOutgoing.outgoingQueue.front().first);
00592 enqueueOutgoing.outgoingQueue.front().first->release();
00593 }
00594 if(enqueueOutgoing.outgoingQueue.front().second) {
00595 if(sendqueue) sendMessage(enqueueOutgoing.outgoingQueue.front().second);
00596 enqueueOutgoing.outgoingQueue.front().second->release();
00597 }
00598 enqueueOutgoing.outgoingQueue.pop();
00599 }
00600
00601 removeMessageFilter(&rt);
00602 }
00603
00604 void RemoteSocketSite::sendMessage(Message* m)
00605 {
00606 for(unsigned int i = 0; i < messagefilters.size(); i++) {
00607 if(! messagefilters[i]->checkMessage(m)) {
00608 LOG("remotesocketsite", 3, "Not sending filtered message");
00609 return;
00610 }
00611 }
00612
00613 Site::sendMessage(m);
00614 LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString()
00615 << " [" << m->refcount_debug
00616 << "/" << m->getCount() << "]\n" << m->getFormattedString().substr(0, 768));
00617 LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00618
00619 if(writingFD > -1) {
00620 const string& s = m->getFormattedString();
00621 flushOutgoingBuffers(s.c_str(), (unsigned int)s.size());
00622 } else throw SiteConnectionError("Tried to send a message to a disconnected site");
00623 }
00624
00625 void RemoteSocketSite::sendMessage(MessageBlock* m)
00626 {
00627 for(unsigned int i = 0; i < messagefilters.size(); i++) {
00628 if(! messagefilters[i]->checkMessage(m)) {
00629 return;
00630 }
00631 }
00632
00633 if(! m->numMessages()) return;
00634
00635 Site::sendMessage(m);
00636 LOG("RemoteStreamSite::sendMessage", 4, ">>>(outgoing)>>> sending to " << getURL().getString() << "\n" << m->getString());
00637 LOG("refcount", 5, "sendMessage: count on this remotesite is " << getCount());
00638
00639 if(writingFD > -1) {
00640 const string& s = m->getString();
00641 flushOutgoingBuffers(s.c_str(), (unsigned int)s.size());
00642 } else throw SiteConnectionError("Tried to send message to disconnected site");
00643 }
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656 AsyncConnect::AsyncConnect(bool* isdone, RemoteSocketSite** rs, LocalSite* ls,
00657 const string& host, unsigned short int port, bool isspooftest)
00658 : done(isdone), remotesite(rs), localpeer(ls), remotehost(host), remoteport(port), spooftest(isspooftest)
00659 {
00660 *done = false;
00661 *remotesite = 0;
00662 }
00663
00664 RemoteSocketSite* AsyncConnect::connect(const string& h, unsigned short int p, LocalSite* defaultPeer, bool isspooftest)
00665 {
00666 RemoteSocketSite* rss;
00667 bool isdone;
00668
00669 AsyncConnect ac(&isdone, &rss, defaultPeer, h, p, isspooftest);
00670
00671 boost::thread connectthread(ac);
00672 double timeout = defaultPeer->getTimeoutOnSelect();
00673
00674 if(timeout == -1 || timeout > 1) defaultPeer->setTimeoutOnSelect(.2);
00675 while(!isdone) {
00676 defaultPeer->flushIncomingBuffers();
00677 }
00678
00679 connectthread.join();
00680
00681 defaultPeer->setTimeoutOnSelect(timeout);
00682
00683 if(rss == 0) {
00684 throw RemoteSite::SiteConnectionError(ac.exstring);
00685 }
00686
00687 double start = getRealTime();
00688 while(rss->isConnected() && !rss->getGreeted() && (getRealTime() - start) < VOS_DEFAULT_TIMEOUT) {
00689 defaultPeer->flushIncomingBuffers();
00690 }
00691
00692 if(!rss->isConnected() || !rss->getGreeted()) {
00693 rss->handleDisconnection();
00694 rss->release();
00695 throw RemoteSite::SiteConnectionError("Remote site disconnected or did not greet us");
00696 }
00697
00698 return rss;
00699 }
00700
00701
00702 void AsyncConnect::operator()()
00703 {
00704 try {
00705 LOG("AsyncConnect", 3, "ready to start connecting to " << remotehost << ":" << remoteport);
00706 *remotesite = new RemoteSocketSite(remotehost, remoteport);
00707
00708 if(spooftest) (*remotesite)->addFlag("spoof-test");
00709
00710 if(getenv("VOS_HOSTNAME") == 0) {
00711 unsigned char ipaddr[4];
00712 string host = Site::detectHostname((*remotesite)->getReadingFD(), ipaddr);
00713 LOG("AsyncConnect", 3, "detected IP is "
00714 << (unsigned int)ipaddr[0] << "." << (unsigned int)ipaddr[1] << "."
00715 << (unsigned int)ipaddr[2] << "." << (unsigned int)ipaddr[3]);
00716 URL u(localpeer->getURL());
00717 u.setHost(host);
00718 localpeer->addHostAlias(u.getHostAndPort());
00719 if(!(ipaddr[0] == 10 || ipaddr[0] == 127 || (ipaddr[0] == 192 && ipaddr[1] == 168))) {
00720 localpeer->setPrimaryHostname(host);
00721 }
00722 }
00723 Site::doSitePeering(localpeer, *remotesite, spooftest, false);
00724 } catch(runtime_error e) {
00725 exstring = e.what();
00726 LOG("remotesocketsite", 2, "Caught exception " << exstring);
00727 }
00728 *done = true;
00729 LOG("AsyncConnect", 3, "done trying to connect to " << remotehost << ":" << remoteport);
00730 }