Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Namespace Members | Compound Members | File Members | Related Pages | Examples

vos/corelibs/vos/localsite.cc

Go to the documentation of this file.
00001 /*
00002     This file is part of the Virtual Object System of
00003     the Interreality project (http://interreality.org).
00004 
00005     Copyright (C) 2001-2003 Peter Amstutz
00006 
00007     This library is free software; you can redistribute it and/or
00008     modify it under the terms of the GNU Lesser General Public
00009     License as published by the Free Software Foundation; either
00010     version 2 of the License, or (at your option) any later version.
00011 
00012     This library is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015     Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public
00018     License along with this library; if not, write to the Free Software
00019     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00020 
00021     Peter Amstutz <tetron@interreality.org>
00022 */
00023 
00024 #include "localsite.hh"
00025 #include "accesscontrol.hh"
00026 #include "remotesocketsite.hh"
00027 #include "message.hh"
00028 #include "messageblock.hh"
00029 #include "timer.hh"
00030 
00031 using namespace VOS;
00032 
00033 #include <sys/types.h>
00034 #include <fcntl.h>
00035 #include <stdio.h>
00036 #include <errno.h>
00037 #include <math.h>
00038 #include <cstdarg>
00039 #include <algorithm>
00040 #include <typeinfo>
00041 
00042 multimap<string, metaobject_extender_t> LocalSite::localObjectExtensionTable;
00043 
00044 LocalSite::LocalSite()
00045     : VobjectImplementation("", 0, true), LocalVobject("", 0, 0),
00046       LocalMetaObject("", 0, 0), MetaObject(0), selectwait(new struct timeval()),
00047       callbacklock(false), notifyflushlock(false)
00048 {
00049     setGreeted(true);
00050 }
00051 
00052 LocalSite::~LocalSite()
00053 {
00054 }
00055 
00056 void LocalSite::scheduleMessageBlock(MessageBlock* mb, MessageContext* mc, Site* ss,
00057                                      double plustime, const char* extradependency)
00058 {
00059     if(mb->getName() != "" && mc == 0) {
00060         ss->addMessageBlock(mb);
00061     } else {
00062         if(mb->numMessages() > 1 && !mc) {
00063             /* note: So as to prevent outgoing message traps from
00064                persisting for the entire session defaultContext is NOT
00065                a parent context.  It is used only for the special
00066                cases of "to", "from" and "method".
00067             */
00068             mc = new MessageContext(); // released: after the for loop
00069             try {
00070                 mc->setParameter("to", ss->defaultContext.getParameter("to"));
00071                 mc->setParameter("from", ss->defaultContext.getParameter("from"));
00072                 mc->setParameter("method", ss->defaultContext.getParameter("method"));
00073             } catch(MessageContext::NoSuchParameterError x) {
00074                 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00075             }
00076         }
00077         else if(mc) mc->acquire(); // released: after the for loop
00078         for(int j=0; j < mb->numMessages(); j++) {
00079             vRef<Message> m = mb->getMessage(j);
00080 
00081             if(m->getType() == "default") {
00082                 if(mc) {
00083                     if(m->hasMethod()) mc->setParameter("method", m->getMethod());
00084                     if(m->hasTo()) mc->setParameter("to", m->getTo());
00085                     if(m->hasFrom()) mc->setParameter("from", m->getFrom());
00086                 } else {
00087                     if(m->hasMethod()) ss->defaultContext.setParameter("method", m->getMethod());
00088                     if(m->hasTo()) ss->defaultContext.setParameter("to", m->getTo());
00089                     if(m->hasFrom()) ss->defaultContext.setParameter("from", m->getFrom());
00090                 }
00091             } else if(m->getType() == "include") {
00092                 try {
00093                     vRef<MessageBlock> mb = ss->getMessageBlock(m->getMethod());
00094                     vRef<MessageContext> newmc = new MessageContext();
00095                     newmc->setParentContext(mc);
00096                     try {
00097                         if(m->hasTo()) newmc->setParameter("to", m->getTo());
00098                         else newmc->setParameter("to", ss->defaultContext.getParameter("to"));
00099 
00100                         if(m->hasFrom()) newmc->setParameter("from", m->getFrom());
00101                         else newmc->setParameter("from", ss->defaultContext.getParameter("from"));
00102 
00103                         if(m->hasMethod()) newmc->setParameter("method", m->getMethod());
00104                         else newmc->setParameter("method", ss->defaultContext.getParameter("method"));
00105                     } catch(MessageContext::NoSuchParameterError x) {
00106                         LOG("localsite", 3, "scheduleMessageBlock Error " << x.what());
00107                     }
00108                     for(int i = 0; i < m->getNumFields(); i++) {
00109                         const Message::Field& f = m->getField(i);
00110                         newmc->setParameter(f.key, f.source);
00111                     }
00112                     string extradep = m->getDependency();
00113                     mc->doSubstitution(extradep);
00114                     scheduleMessageBlock(&mb, &newmc, ss, plustime + m->getTime(), extradep.c_str());
00115                 } catch(NoSuchMessageBlockError) { }
00116             } else {
00117                 vRef<Message> mcopy = (mb->getName() == "" && mc==0) ? (m->acquire(), &m) : (new Message(*m));
00118                 if(mcopy != m && mcopy->getCount() != 1) LOG("refcount", 5, "count on new (copied) message is "
00119                                                              << mcopy->getCount());
00120                 if(extradependency) {
00121                     if(mcopy->getDependency() == "") mcopy->setDependency(extradependency);
00122                     else {
00123                         string s = mcopy->getDependency();
00124                         s += ",";
00125                         s += extradependency;
00126                         mcopy->setDependency(s);
00127                     }
00128                 }
00129                 mcopy->setSourceSite(ss);
00130                 mcopy->setMessageContext(mc);
00131                 if(mcopy->getDependency() != "") {
00132                     ss->trapOutgoingReply(&mcopy);
00133                 }
00134                 if(mc) {
00135                     try {
00136                         if(! mcopy->hasMethod()) mcopy->setMethod(mc->getParameter("method"));
00137                         if(! mcopy->hasTo()) mcopy->setTo(mc->getParameter("to"));
00138                         if(! mcopy->hasFrom()) mcopy->setFrom(mc->getParameter("from"));
00139                     } catch(MessageContext::NoSuchParameterError x) {
00140                         LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00141                     }
00142                 } else {
00143                     try {
00144                         if(! mcopy->hasMethod()) mcopy->setMethod(ss->defaultContext.getParameter("method"));
00145                         if(! mcopy->hasTo()) mcopy->setTo(ss->defaultContext.getParameter("to"));
00146                         if(! mcopy->hasFrom()) mcopy->setFrom(ss->defaultContext.getParameter("from"));
00147                     } catch(MessageContext::NoSuchParameterError x) {
00148                         LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00149                     }
00150                 }
00151 
00152                 double now = getTimer();
00153 
00154 //    LOG("localsite", 3, "entered scheduleMessageBlock " << tv.tv_sec << "." << tv.tv_usec);
00155 
00156                 LOG("localsite", 4, "scheduling incoming message " << mcopy->incoming_debug);
00157                 mcopy->acquire(); // released by: runSchedule()
00158                 siteMessageQueue[now + plustime + mcopy->getTime()].push_back(&mcopy);
00159             }
00160         }
00161         if(mc) mc->release(); // acquired by: new MessageContext() or mc->acquire() before the for loop
00162     }
00163 }
00164 
00165 void LocalSite::addCallback(callback_t callback, void* userdata, double timeout, bool repeat) {
00166     for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00167         if((*i).second.callback == callback) {
00168             (*i).second.userdata = userdata;
00169             (*i).second.timeout = timeout;
00170             (*i).second.repeat = repeat;
00171             return;
00172         } else i++;
00173     }
00174     scheduleCallback(callback, 0, userdata, timeout, repeat);
00175 }
00176 
00177 void LocalSite::addCallback(CallbackListener* listener, double timeout, bool repeat) {
00178     for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00179         if((*i).second.listener == listener) {
00180             (*i).second.userdata = 0;
00181             (*i).second.timeout = timeout;
00182             (*i).second.repeat = repeat;
00183             return;
00184         } else i++;
00185     }
00186     scheduleCallback(0, listener, 0, timeout, repeat);
00187 }
00188 
00189 void LocalSite::scheduleCallback(callback_t callback, CallbackListener* listener, void* userdata, double timeout, bool repeat) {
00190     double now = getTimer();
00191     if(! callbacklock) {
00192         callbacks[now + timeout].callback = callback;
00193         callbacks[now + timeout].listener = listener;
00194         callbacks[now + timeout].userdata = userdata;
00195         callbacks[now + timeout].timeout = timeout;
00196         callbacks[now + timeout].repeat = repeat;
00197     } else {
00198         pair<double, CallbackInfo> p;
00199         p.first = now + timeout;
00200         p.second.callback = callback;
00201         p.second.listener = listener;
00202         p.second.userdata = userdata;
00203         p.second.timeout = timeout;
00204         p.second.repeat = repeat;
00205         callbackaddqueue.push(p);
00206     }
00207 }
00208 
00209 void LocalSite::removeCallback(callback_t callback)
00210 {
00211     if(! callbacklock) {
00212         for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00213             if((*i).second.callback == callback) {
00214                 map<double, CallbackInfo>::iterator tmp = i;
00215                 i++;
00216                 callbacks.erase(tmp);
00217             } else i++;
00218         }
00219     } else {
00220         callbackremovequeue.push(callback);
00221     }
00222 }
00223 
00224 void LocalSite::removeCallback(CallbackListener* listener)
00225 {
00226     if(! callbacklock) {
00227         for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00228             if((*i).second.listener == listener) {
00229                 map<double, CallbackInfo>::iterator tmp = i;
00230                 i++;
00231                 callbacks.erase(tmp);
00232             } else i++;
00233         }
00234     } else {
00235         callback_listener_remove_queue.push(listener);
00236     }
00237 }
00238 
00239 void LocalSite::doCallbacks()
00240 {
00241     if(! callbacklock) { // callback could call runSchedule()
00242         callbacklock = true;
00243 
00244         double now = getTimer();
00245 
00246         LOG("localsite", 5, "Running callbacks (" << (unsigned int)callbacks.size() << ")");
00247 
00248         for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00249             if((*i).first <= now) {
00250                 try {
00251                     if ( (*i).second.callback )
00252                         (*i).second.callback((*i).second.userdata);
00253                     else if ( (*i).second.listener )
00254                         (*i).second.listener->notifySiteCallback();
00255                     else
00256                         LOG("localsite", 1, "Warning: Callback has neither a callback function nor a listener object!");
00257 
00258                 } catch(exception& e) {
00259                     LOG("localsite", 1, "Callback emitted exception: " << e.what());
00260                 } catch(...) {
00261                     LOG("localsite", 1, "Callback emitted unknown exception!");
00262                 }
00263 
00264                 if((*i).second.repeat) {
00265                     pair<double, CallbackInfo> p;
00266                     p.first = now + (*i).second.timeout;
00267                     p.second = (*i).second;
00268                     callbackaddqueue.push(p);
00269                     LOG("localsite", 5, "resetting callback to trigger in " << (*i).second.timeout << " seconds");
00270                 }
00271 
00272                 map<double, CallbackInfo>::iterator tmp = i;
00273                 i++;
00274                 callbacks.erase(tmp);
00275             } else break;
00276         }
00277 
00278         callbacklock = false;
00279 
00280         while(! callbackaddqueue.empty())
00281         {
00282             callbacks[callbackaddqueue.front().first] = callbackaddqueue.front().second;
00283             callbackaddqueue.pop();
00284         }
00285 
00286         while(! callbackremovequeue.empty())
00287         {
00288             removeCallback(callbackremovequeue.front());
00289         }
00290 
00291         while(! callback_listener_remove_queue.empty())
00292         {
00293             removeCallback(callback_listener_remove_queue.front());
00294         }
00295 
00296         LOG("localsite", 5, "Done with callbacks");
00297     }
00298 }
00299 
00300 void LocalSite::runSchedule()
00301 {
00302     double now = getTimer();
00303 
00304     LOG("localsite", 5, "Running schedule");
00305 
00306     bool inc_i;
00307     for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin(); i != siteMessageQueue.end(); inc_i ? i++ : 0) {
00308         inc_i = true;
00309         if((*i).first <= now) {
00310             bool inc_q;
00311             for(list<Message*>::iterator q = (*i).second.begin(); q != (*i).second.end(); inc_q ? q++ : 0) {
00312                 inc_q = true;
00313                 MessageContext* mc = (*q)->getMessageContext(); // released by: mc->release() below
00314                 bool depsatisfied=true;
00315                 string dep = (*q)->getDependency();
00316                 if((*q)->getDependency() != "" && mc) {
00317                     unsigned int i=0;
00318                     unsigned int n=0;
00319                     while(i < dep.size()) {
00320                         while(i < dep.size() && dep[i] != ',') i++;
00321                         LOG("localsite", 5, "--- 0 depsatisfied " << depsatisfied << " ---");
00322                         LOG("localsite", 5, "dep.substr: " << dep.substr(n, i-n));
00323                         depsatisfied &= mc->replyRecordContains(dep.substr(n, i-n));
00324                         LOG("localsite", 5, "1 depsatisfied " << depsatisfied);
00325                         n=++i;
00326                     }
00327                 }
00328                 LOG("localsite", 5, "2 depsatisfied " << depsatisfied);
00329                 bool removeQ = false;
00330                 if(depsatisfied) {
00331                     map<string, RemoteSite*>::iterator r;
00332                     vRef<Site> s = (*q)->getSourceSite();
00333                     if(&s && (s->getGreeted() || ((*q)->getMethod() == "core:hello" && (*q)->getType() == "message")))
00334                     {
00335                         URL to((*q)->getTo());
00336                         if(! pendingValidationHosts.count(to.getHostAndPort())) {
00337                             r = prioritized.find((*q)->getNonce());
00338                             if(r != prioritized.end() && (*r).second == &s) prioritizedQueue.push_back(*q);
00339                             else
00340                             msgsReady.push_back(*q);
00341 
00342                             LOG("localsite", 4, "adding msg " << (*q)->incoming_debug << " to schedule with time");
00343                             //fprintf(stderr, "%16.16f\n", (*i).first);
00344                         }
00345 
00346                         removeQ = true;
00347 
00348                     } else {
00349                         LOG("localsite", 4, "can't deliver mesage " << (*q)->getMethod() << " until source site greets us");
00350                         if(!s->isConnected() && !s->getGreeted()) removeQ = true;
00351                     }
00352                 } else {
00353                     vRef<Site> s = (*q)->getSourceSite();
00354                     if(! s->isConnected()) removeQ = true;
00355                 }
00356                 if(removeQ) {
00357                     list<Message*>::iterator old = q;
00358                     q++;
00359                     inc_q = false;
00360                     (*i).second.erase(old);
00361                     if(q == (*i).second.end()) {
00362                         if(mc) mc->release(); // acquired by: getMessageContext() above
00363                         break;
00364                     }
00365                 }
00366                 if(mc) mc->release(); // acquired by: getMessageContext() above
00367             }
00368             if((*i).second.size() == 0) {
00369                 map<double, list<Message*> >::iterator old = i;
00370                 i++;
00371                 inc_i = false;
00372                 siteMessageQueue.erase(old);
00373                 if(i == siteMessageQueue.end()) break;
00374             }
00375         } else break;
00376     }
00377 
00378     LOG("localsite", 5, "Done building schedule (" << (unsigned int)msgsReady.size() << ", " << (unsigned int)prioritizedQueue.size() << ")");
00379 
00380     if(msgsReady.empty() && prioritizedQueue.empty()) {
00381         LOG("localsite", 5, "Done running schedule");
00382 
00383         flushNotifications();
00384         doCallbacks();
00385 
00386         return;
00387     }
00388 
00389     if(! prioritizedQueue.empty()) {
00390         //while(! prioritizedQueue.empty()) {
00391             vRef<Message> m = prioritizedQueue.front();
00392             prioritizedQueue.pop_front();
00393             vRef<Site> s = m->getSourceSite();
00394             if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00395             try {
00396                 sendMessage(&m);
00397             } catch(runtime_error e) {
00398                 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00399             } catch(...) {
00400                 LOG("localsite", 2, "sendMessage emitted unknown exception");
00401             }
00402             if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00403                 Site::removeSite(*s);
00404             }
00405         //}
00406         return;
00407     }
00408 
00409     while(! msgsReady.empty()) {
00410         vRef<Message> m = msgsReady.front();
00411         msgsReady.pop_front();
00412         vRef<Site> s = m->getSourceSite();
00413         if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00414         LOG("msgrefcount", 5, "about to send message number " << m->refcount_debug << " with count " << m->getCount());
00415 
00416         for(unsigned int i = 0; i < messagefilters.size(); i++) {
00417             if(! messagefilters[i]->checkMessage(&m)) {
00418                 LOG("localsite", 4, "Dropping filtered message: " << m->getLoggableString());
00419                 continue;
00420             }
00421         }
00422 
00423         try {
00424             sendMessage(&m);
00425         } catch(exception& e) {
00426             LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00427         } catch(...) {
00428             LOG("localsite", 2, "sendMessage emitted unknown exception");
00429         }
00430 
00431         if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00432             Site::removeSite(*s);
00433         }
00434     }
00435 
00436     runSchedule();
00437 }
00438 
00439 void LocalSite::removeRemotePeer(RemoteSite* rs)
00440 {
00441     if(iteratorsUsingPeerSites == 0) {
00442         map<RemoteSite*, SiteTableEntry*>::iterator i = peerSites.find(rs);
00443         if(i != peerSites.end()) {
00444             LOG("refcount", 5, "localsite: removing remote peer, count is " << (*i).first->getCount());
00445             (*i).first->release(); // acquired by: doSitePeering()
00446             if((*i).second->partialMessage)
00447                 (*i).second->partialMessage->release(); // acquired by: flushIncomingBuffers()
00448             delete (*i).second;
00449             peerSites.erase(i);
00450         }
00451     } else {
00452         boost::mutex::scoped_lock lock(peerSitesBuffer_mutex);
00453         peerSitesBuffer_remove.push_back(rs);
00454     }
00455 }
00456 
00457 bool LocalSite::checkScheduleHoldsSite(Site* site)
00458 {
00459     for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00460         i != siteMessageQueue.end(); i++)
00461     {
00462         for(list<Message*>::iterator n = (*i).second.begin();
00463             n != (*i).second.end(); n++)
00464         {
00465             bool ret = false;
00466             pREF(Site*, s, (*n)->getSourceSite(),
00467                  if(s == site) ret = true;
00468                  );
00469             if(ret) return true;
00470         }
00471     }
00472     return false;
00473 }
00474 
00475 void LocalSite::sendMessage(MessageBlock* mb)
00476 {
00477     Site::sendMessage(mb);
00478     LocalVobject::sendMessage(mb);
00479 }
00480 
00481 void LocalSite::verifyCheckIDPair(RemoteSocketSite* rss, const string& spoofID)
00482 {
00483     vRef<Message> nm = new Message();
00484     nm->setType("message");
00485     nm->setTo(rss->getURL().getString());
00486     nm->setMethod("core:anti-spoof-check");
00487     nm->generateNonce();
00488     nm->insertField(-1, "peek", spoofID);
00489     rss->sendMessage(&nm);
00490 }
00491 
00492 void LocalSite::sendMessage(Message* m)
00493 {
00494     /*static int nextexpected = 1;
00495     if(m->incoming_debug != nextexpected) {
00496         LOG("localsite", 1, "Inconsistant incoming!  Expected " << nextexpected << " but got " << m->incoming_debug);
00497         nextexpected = m->incoming_debug + 1;
00498         } else nextexpected++;*/
00499 
00500     vRef<Site> sourcesite = m->getSourceSite();
00501     LOG("LocalSite::sendMessage", 3, "Incoming message: " << m->getMethod() << " (to site " << getURL().getString()
00502         << "  from " << (sourcesite==0 ? "NULL" : sourcesite->getURL().getString()) << ")");
00503     LOG("LocalSite::sendMessage", 4, "<<<(incoming)<<< sending to " << getURL().getString() << " [" << m->refcount_debug
00504         << "/" << m->incoming_debug << "/" << m->getCount() << "]\n" << m->getLoggableString().substr(0, 768));
00505 
00506     Site::sendMessage(m);
00507 
00508     if(m->getType() == "message") {
00509         if(m->getMethod() == "core:hello") {
00510             vRef<RemoteSocketSite> st = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00511             LOG("refcount", 5, "localsite sendMessage: got handshake, refcount on site is " << st->getCount());
00512             string checkid;
00513             bool foundFavorite = false;
00514             bool shouldGreet = true;
00515             for(int i=0; i < m->getNumFields(); i++) {
00516                 const Message::Field& a=m->getField(i);
00517                 if(a.key == "antispoof") {
00518                     checkid = a.value;
00519                     setAntiSpoofIDMapping(checkid, st->getAntiSpoofID());
00520                 }
00521                 if(a.key == "alias" && !hasHostAlias(a.value) && !st->checkFlag("spoof-test")) {
00522                     LOG("localsite", 3, "Validating alias " << a.value);
00523                     bool checkit = false;
00524                     if(! getAllSites().count(a.value)) checkit = true;
00525                     else {
00526                         vRef<RemoteSocketSite> existingsite = meta_cast<RemoteSocketSite*>(&findSite(a.value));
00527                         if(!&existingsite) throw bad_cast();
00528                         if(!(*st == *existingsite)) {
00529                             if(! existingsite->isConnected()) {
00530                                 removeSite(*existingsite);
00531                                 checkit = true;
00532                             } else {
00533                                 //LOG("localsite", 2, "We have a multiconnection.  Verifying...");
00534                                 LOG("localsite", 2, "May need to bind site " << st->getURL().getString()
00535                                     << " to site " << a.value);
00536                                 // do something?
00537                             }
00538                         }
00539                     }
00540                     if(checkit) {
00541                         if(checkid != "") {
00542                             string host;
00543                             int port;
00544                             unsigned int i;
00545                             for(i = 0; i < a.value.size() && a.value[i] != ':'; i++);
00546                             host = a.value.substr(0, i);
00547                             if(i < a.value.size()) port = atoi(a.value.substr(i+1).c_str());
00548                             else port = VOS_DEFAULT_PORT;
00549                             try {
00550                                 vRef<RemoteSocketSite> rss = AsyncConnect::connect(host, port, this, true);
00551 
00552                                 rss->removeHostAlias(a.value);
00553                                 st->addHostAlias(a.value);
00554 
00555                                 // the AntiSpoofID is the nonce *we* sent with core:hello
00556                                 verifyCheckIDPair(&rss, st->getAntiSpoofID());
00557                                 st->suppressOutgoing(true);
00558 
00559                                 rss->acquire(); // released by: handler for anti-spoof-reply
00560                                 st->acquire(); // released by:  handler for anti-spoof-reply
00561                                 pendingValidations[&rss].realsite = st;
00562                                 pendingValidations[&rss].checkid = checkid;
00563                                 pendingValidations[&rss].hostalias = a.value;
00564                                 pendingValidations[&rss].favored = ! foundFavorite;
00565                                 pendingValidationHosts.insert(a.value);
00566                                 foundFavorite = true;
00567                                 shouldGreet = false;
00568                             } catch(RemoteSite::SiteConnectionError) {
00569                                 LOG("localsite", 1, "Tried to verify '" << a.value << "' but could not connect.");
00570                                 vRef<Message> err = new Message();
00571                                 err->setType("message");
00572                                 err->setMethod("core:error");
00573                                 err->insertField(-1, "error", "Could not verify your hostname '"
00574                                                  + a.value + "'");
00575                                 st->sendMessage(&err);
00576                             }
00577                         }
00578                     } else {
00579                         if(st->getURL().getHostAndPort() != a.value) {
00580                             LOG("localsite", 1, "Attempted spoof! '" << st->getURL().getString() << "' claimed to be '"
00581                                 << a.value << "' but failed verification!");
00582                         }
00583                     }
00584                 }
00585             }
00586             st->setGreeted(shouldGreet);
00587             LOG("refcount", 5, "localsite sendMessage: done with hello, refcount on site is " << st->getCount());
00588         } else if(m->getMethod() == "core:anti-spoof-check") {
00589             try {
00590                 vRef<RemoteSite> st = dynamic_cast<RemoteSite*>(m->getSourceSite());
00591                 const Message::Field& f = m->getField("peek");
00592                 string spid = getAntiSpoofIDMapping(f.value);
00593                 if(spid == "") {
00594                     st->acquire(); // released by: doSitePeering
00595                     needSpoofIDreply.push_back(pair<string, RemoteSite*>(f.value, &st));
00596                 } else {
00597                     vRef<Message> reply = new Message();
00598                     initReply(this, &reply, m, "core:anti-spoof-reply");
00599                     reply->setFrom("");
00600                     reply->setType("message");
00601                     reply->insertField(-1, "poke", spid);
00602                     st->sendMessage(&reply);
00603                 }
00604             } catch(...) { }
00605         } else if(m->getMethod() == "core:anti-spoof-reply") {
00606             vRef<RemoteSocketSite> rss = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00607             if(pendingValidations.count(&rss)) {
00608                 string hostalias = pendingValidations[&rss].hostalias;
00609                 vRef<RemoteSocketSite> st = pendingValidations[&rss].realsite;
00610 
00611                 bool isok=false;
00612                 try {
00613                     if(m->getField("poke").value == pendingValidations[&rss].checkid) isok = true;
00614                 } catch(Message::NoSuchFieldError) {
00615                 }
00616 
00617                 rss->handleDisconnection();
00618                 if(isok) {
00619                     LOG("localsite", 3, "Adding verified host alias " << hostalias
00620                         << " to " << st->getURL().getString());
00621                     pendingValidationHosts.erase(hostalias);
00622                     if(pendingValidations[&rss].favored) {
00623                         URL u = st->getURL();
00624                         u.setHostAndPort(hostalias);
00625                         st->setURL(u);
00626                         LOG("localsite", 3, "New preferred alias: " << st->getURL().getString());
00627                     }
00628                     st->setGreeted(true);
00629                     st->enableOutgoing(true, st->getURL().getHostAndPort());
00630                 } else {
00631                     LOG("localsite", 1, "Attempted spoof?  Was given " << hostalias
00632                         << " by " << st->getURL().getString() << " but could not verify");
00633                     vRef<Message> err = new Message();
00634                     err->setType("message");
00635                     err->setMethod("core:error");
00636                     err->insertField(-1, "error", "Could not verify your hostname '" + hostalias
00637                                      + "' (I'm not going to talk to you. Sorry).");
00638                     st->sendMessage(&err);
00639                     st->removeHostAlias(hostalias);
00640                 }
00641                 pendingValidations.erase(&rss);
00642                 rss->release(); // acquired by: handler for core:hello
00643             }
00644             return; // no reason to process this message any more
00645         } else if(m->getMethod() == "core:error") {
00646             try {
00647                 LOG("localsite", 1, "Got error: " << m->getField("error").value);
00648             } catch(Message::NoSuchFieldError) { }
00649         } else if(m->getMethod() == "core:create-object") {
00650             vRef<Message> reply = new Message();
00651             initReply(this, &reply, m, "core:set-child-update");
00652             string newobjname;
00653             deque<string> newobjtypes;
00654             try {
00655                 for(int i=0; i < m->getNumFields(); i++) {
00656                     const Message::Field& f = m->getField(i);
00657                     if(f.key == "type") {
00658                         newobjtypes.push_back(f.value);
00659                     } else if(f.key == "name") {
00660                         newobjname = f.value;
00661                     }
00662                 }
00663             } catch(Message::NoSuchFieldError) { }
00664 
00665             vRef<Site> ss = m->getSourceSite();
00666 
00667             const deque<VobjectAccessControl*>& ac = getAccessControls();
00668             bool permitted = true;
00669             string errmessage;
00670             for(unsigned int i = 0; i < ac.size(); i++) {
00671                 SiteAccessControl* sac = dynamic_cast<SiteAccessControl*>(ac[i]);
00672                 if(!sac || !sac->checkCreateVobjectPermission(*ss, *this, newobjname, newobjtypes, errmessage)) {
00673                     permitted = false;
00674                     break;
00675                 }
00676             }
00677             if(ac.size() == 0) permitted = false;
00678 
00679             if(permitted) {
00680                 vRef<LocalMetaObject> newobj =
00681                     meta_cast<LocalMetaObject*>(createMetaObject(newobjname.c_str(), (char*)0));
00682 
00683                 for(deque<string>::iterator i = newobjtypes.begin();
00684                     i != newobjtypes.end(); i++)
00685                 {
00686                     string message;
00687                     vRef<VobjectEvent> event = new VobjectEvent(VobjectEvent::TypeInsert, *ss, *newobj, *i);
00688                     if(validateAccess(*event, message)) newobj->addType(*i, *ss);
00689                 }
00690                 vRef<ParentChildRelation> pcr = newobj->findParent(*this);
00691                 char n[16];
00692                 snprintf(n, sizeof(n), "%i", pcr->position);
00693                 reply->insertField(-1, "pos", n);
00694                 reply->insertField(-1, "name", pcr->contextual_name);
00695                 reply->insertField(-1, "path", newobj->getURL().getString());
00696                 const TypeSet& ts = newobj->getTypes();
00697                 if(ts.size() > 0) {
00698                     for(TypeSet::const_iterator ti = ts.begin(); ti != ts.end(); ti++) {
00699                         reply->insertField(-1, "type", *ti);
00700                     }
00701                 } else reply->insertField(-1, "type", "");
00702                 LOG("refcounted", 5, "Done creating meta object, count (after release) is " << (newobj->getCount()-1));
00703             } else {
00704                 reply->insertField(-1, "error", "Permission denied: " + errmessage);
00705             }
00706             try {
00707                 ss->sendMessage(&reply);
00708             } catch(...) { }
00709         }
00710     }
00711 
00712     if(&sourcesite && sourcesite->checkFlag("spoof-test")) return;
00713 
00714     try {
00715         URL to(m->getTo());
00716         URL from(m->getFrom());
00717 
00718         if(! getAllSites().count(to.getHostAndPort())) {
00719             LOG("localsite", 3, "dropping message with unknown to field " << to.getString());
00720             return;
00721         }
00722 
00723         map<string, Site*>::const_iterator sti = getAllSites().find(from.getHostAndPort());
00724         vRef<Site> fromsite;
00725         if(sti != getAllSites().end()) {
00726             fromsite = (*sti).second;
00727             fromsite->acquire();
00728         }
00729         if(! &fromsite) {
00730             LOG("localsite", 3, "dropping message with unknown from field " << from.getString());
00731             return;
00732         }
00733 
00734         LOG("refcount", 5, "localsite sendMessage: fromsite refcount is " << fromsite->getCount());
00735         vRef<Site> st = m->getSourceSite();
00736         if(! &st) {
00737             LOG("localsite", 3, "dropping message with no sourcesite!");
00738             return;
00739         }
00740 
00741         if(&fromsite != &st && st->isRemote()) {
00742             if(!st->checkFlag("spoof-test") && m->getMethod() != "core:type-add-update") {
00743                 LOG("sendMessage", 2, "Note: site in from field \"" << fromsite->getURL().getString()
00744                     << "\" and actual source site \"" << st->getURL().getString()
00745                     << "\" are not the same object");
00746                 LOG("sendMessage", 3, "error happened on message " << m->getLoggableString());
00747             }
00748             return;
00749         }
00750         if(peerSites.count(dynamic_cast<RemoteSite*>(&fromsite))) {
00751             if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == "") {
00752                 peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName = to.getHostAndPort();
00753             }
00754             if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == to.getHostAndPort()) {
00755                 to.setHostAndPort(getURL().getHostAndPort());
00756                 m->setTo(to.getString());
00757             }
00758         }
00759         vRef<Vobject> vob = Vobject::findObjectFromRoot(m->getTo());
00760         if(!(dynamic_cast<LocalSite*>(&vob) == this)) {
00761             try {
00762                 vob->sendMessage(m);
00763             } catch(exception& e) {
00764                 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00765                 return;
00766             } catch(...) {
00767                 LOG("localsite", 2, "sendMessage emitted unknown exception");
00768                 return;
00769             }
00770         }
00771     } catch(NoSuchSiteError) {
00772         LOG("localsite", 3, "No such site on to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\")  "
00773             << m->getLoggableString());
00774         return;
00775     } catch(NoSuchObjectError) {
00776         LOG("localsite", 3, "No such to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") object");
00777         return;
00778     } catch(URL::BadURLError) {
00779         LOG("localsite", 3, "Bad to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") field");
00780         return;
00781     }
00782 
00783     if(m->getType() == "update") {
00784         try {
00785             vRef<Vobject> target = Vobject::findObjectFromRoot(m->getFrom());
00786             LOG("refcount", 5, "about to deliver to update Message, count on msg is " << m->getCount());
00787             target->sendUpdateMessage(m);
00788             LOG("refcount", 5, "did deliver to updateMessage, count on msg is " << m->getCount());
00789         } catch(NoSuchObjectError) {
00790             return;
00791         } catch(exception& e) {
00792             LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00793             return;
00794         } catch(...) {
00795             LOG("localsite", 2, "sendMessage emitted unknown exception");
00796             return;
00797         }
00798     }
00799 
00800     LocalMetaObject::sendMessage(m);
00801 
00802     LOG("localsite", 4, "did deliver to sendMessage, count on msg is " << m->getCount());
00803 }
00804 
00805 void LocalSite::setURL(const URL& u)
00806 {
00807     url = u;
00808 }
00809 
00810 void LocalSite::setTimeoutOnSelect(double sec)
00811 {
00812     delete selectwait;
00813     if(sec < 0) selectwait = 0;
00814     else {
00815         selectwait = new struct timeval();
00816         selectwait->tv_sec = (long)floor(sec);
00817         selectwait->tv_usec = (long)((sec - floor(sec))*1000000.0);
00818     }
00819 }
00820 
00821 double LocalSite::getTimeoutOnSelect()
00822 {
00823     if(selectwait) return (((double)selectwait->tv_sec) + (((double)selectwait->tv_usec) / 1000000.0));
00824     else return -1;
00825 }
00826 
00827 void LocalSite::addPrioritizedNonce(const string& nonce, RemoteSite* source)
00828 {
00829     source->acquire(); // released by: removePrioritizedNonce()
00830     LOG("refcount", 5, "addPrioritizedNonce for " << nonce << " acquired " << source->getURL().getString()
00831         << " count is now " << source->getCount());
00832     prioritized[nonce] = source;
00833 }
00834 
00835 void LocalSite::removePrioritizedNonce(const string& nonce, RemoteSite* source)
00836 {
00837     map<string, RemoteSite*>::iterator i = prioritized.find(nonce);
00838     if(i != prioritized.end() && (*i).second == source) {
00839         prioritized.erase(i);
00840         LOG("refcount", 5, "removePrioritizedNonce released " << source->getURL().getString()
00841             << " count is now " << (source->getCount()-1));
00842         source->release(); // acquired by: addPrioritizedNonce()
00843     }
00844 }
00845 
00846 void LocalSite::extendMetaObject(LocalMetaObject* root, const char* t) {
00847     if(localObjectExtensionTable.count(t) == 0)
00848         root->addType(t);
00849     else {
00850         pair<MI, MI> range = localObjectExtensionTable.equal_range(t); // get iterator range of types matching t
00851         for(MI it = range.first; it != range.second; it++) {
00852             MetaObject* newext = (*it).second(root, t);
00853             root->addType(newext->getType()); // add all the metaobjects for type t
00854         }
00855     }
00856 }
00857 
00858 
00859 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const deque<string>& typelist)
00860 {
00861     string name = uniqueName(desiredName);
00862     if(!ac) ac = getAccessControls()[0];
00863     LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00864     LocalVobject::insertChild(-1, root->getName(), root);
00865     for(deque<string>::const_iterator i = typelist.begin(); i != typelist.end() ; i++) {
00866         extendMetaObject(root, (*i).c_str());
00867     }
00868     return root;
00869 }
00870 
00871 MetaObject* LocalSite::createMetaObject(const char* name, const deque<string>& typelist) {
00872     return createMetaObject(name, 0, typelist);
00873 }
00874 
00875 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const char* firstType, ...)
00876 {
00877     string name = uniqueName(desiredName);
00878     if(!ac) ac = getAccessControls()[0];
00879     LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00880     LocalVobject::insertChild(-1, root->getName(), root);
00881     if(!firstType)
00882         return root;
00883     extendMetaObject(root, firstType);
00884     va_list ap;
00885     va_start(ap, firstType);
00886     for(;;) {
00887         char* p=va_arg(ap, char*);
00888         if(p == 0) break;
00889         extendMetaObject(root, p);
00890     }
00891     va_end(ap);
00892     return root;
00893 }
00894 
00895 
00896 MetaObject* LocalSite::createMetaObject(const char* desiredName, const char* first, ...)
00897 {
00898     string name = uniqueName(desiredName);
00899     LocalMetaObject* root = new LocalMetaObject(name, this, getAccessControls()[0]);
00900     LocalVobject::insertChild(-1, root->getName(), root);
00901     if(!first)
00902         return root;
00903     extendMetaObject(root, first);
00904     va_list ap;
00905     va_start(ap, first);
00906     for(;;) {
00907         char* t = va_arg(ap, char*);
00908         if(t == 0)
00909             break;
00910         extendMetaObject(root, t);
00911     }
00912     va_end(ap);
00913     return root;
00914 }
00915 
00916 
00917 void LocalSite::addLocalObjectExtension(const char* type, metaobject_extender_t newmethod)
00918 {
00919     localObjectExtensionTable.insert(multimap<string, metaobject_extender_t>::value_type(string(type), newmethod));
00920 }
00921 
00922 
00923 void LocalSite::removeLocalObjectExtension(const char* type, metaobject_extender_t oldmethod)
00924 {
00925     pair<MI, MI> g = localObjectExtensionTable.equal_range(type);
00926     for(MI it = g.first; it != g.second; it++) {
00927         if((*it).second == oldmethod) {
00928             localObjectExtensionTable.erase(it);
00929             break;
00930         }
00931     }
00932 }
00933 
00934 void LocalSite::printExtensionTable(ostream& stream) {
00935     for(MI i = localObjectExtensionTable.begin(); i != localObjectExtensionTable.end(); i++) {
00936         stream << i->first << endl;
00937     }
00938 }
00939 
00940 void LocalSite::addNotification(NotifyEvent* ev)
00941 {
00942     notifyEvents.push(ev);
00943 }
00944 
00945 void LocalSite::lockNotificationFlush()
00946 {
00947     notifyflushlock = true;
00948 }
00949 
00950 void LocalSite::unlockNotificationFlush()
00951 {
00952     notifyflushlock = false;
00953     flushNotifications();
00954 }
00955 
00956 void LocalSite::flushNotifications()
00957 {
00958     if(! notifyflushlock) {
00959         static int recursiondepth = 0;
00960         LOG("localsite", 5, "Flushing notifications (queue size is " << (unsigned int)notifyEvents.size() << ")");
00961         while(! notifyEvents.empty()) {
00962             NotifyEvent* n = notifyEvents.front();
00963             notifyEvents.pop();
00964             try {
00965                 n->notify();
00966             } catch(exception& x) {
00967                 LOG("localsite", 0, "call to notify emitted exception: " << x.what());
00968             } catch(...) { }
00969             delete n;
00970         }
00971         LOG("localsite", 5, "Done flushing notifications");
00972     }
00973 }
00974 
00975 
00976 void LocalSite::insertChild(int position, const string& contextual_name, Vobject* child)
00977     throw (AccessControlError, RemoteError)
00978 {
00979     throw AccessControlError("Impossible to do insertChild on a site, try createObject");
00980 }
00981 
00982 void LocalSite::setChild(int position, const string& contextual_name, Vobject* child)
00983     throw (AccessControlError, RemoteError)
00984 {
00985     throw AccessControlError("Impossible to do setChild on a site, try createObject");
00986 }
00987 
00988 void LocalSite::removeChild(int position)
00989     throw (AccessControlError, RemoteError)
00990 {
00991     throw AccessControlError("Impossible to do removeChild on a site, try excise");
00992 }
00993 
00994 void LocalSite::takeOverMessages(Site* was, Site* now)
00995 {
00996     for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00997         i != siteMessageQueue.end();
00998         i++)
00999     {
01000         for(list<Message*>::iterator n = (*i).second.begin();
01001             n != (*i).second.end();
01002             n++)
01003         {
01004             vRef<Site> s = (*n)->getSourceSite();
01005             if(&s == was) (*n)->setSourceSite(now);
01006         }
01007     }
01008 }
01009 
01010 // There is a tiny, tiny chance that somehow the main thread
01011 // shrink the child list right in the middle of this loop,
01012 // which would cause the iterator to run off the end.
01013 // This seems like a fairly infrequent race condition,
01014 // and one which is incredibly difficult to fix
01015 // due to the fact that it would require locking
01016 // the child list (which is used everywhere).
01017 // Of course, this loop should only ever run once anyway...
01018 void LocalSite::setPrimaryHostname(string s)
01019 {
01020     if(s != url.getHost()) {
01021         url.setHost(s);
01022         addHostAlias(url.getHostAndPort());
01023         for(unsigned int i = 0; i < children.size(); i++) {
01024             if(LocalVobject* v = dynamic_cast<LocalVobject*>(children[i]->child)) {
01025                 URL u = v->getURL();
01026                 u.setHost(s);
01027                 v->setURL(u);
01028             }
01029         }
01030     }
01031 }

Generated on Tue Aug 12 03:55:39 2003 for Interreality Project - VOS by doxygen 1.3.2