Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Namespace Members | Compound Members | File Members | Related Pages | Examples

vos/corelibs/vos/typingtweaks/localsite.cc

Go to the documentation of this file.
00001 /*
00002     This file is part of the Virtual Object System of
00003     the Interreality project (http://interreality.org).
00004 
00005     Copyright (C) 2001, 2002 Peter Amstutz
00006 
00007     This library is free software; you can redistribute it and/or
00008     modify it under the terms of the GNU Lesser General Public
00009     License as published by the Free Software Foundation; either
00010     version 2 of the License, or (at your option) any later version.
00011 
00012     This library is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015     Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public
00018     License along with this library; if not, write to the Free Software
00019     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00020 
00021     Peter Amstutz <tetron@interreality.org>
00022 */
00023 #include <sys/types.h>
00024 #include <fcntl.h>
00025 #include <stdio.h>
00026 #include <errno.h>
00027 #include <math.h>
00028 
00029 #include <algorithm>
00030 #include <typeinfo>
00031 
00032 
00033 #ifdef USE_STRSTREAM
00034 #include <strstream>
00035 #else
00036 #include <sstream>
00037 #endif
00038 
00039 #include "vos.hh"
00040 
00041 /** @file
00042     Implements LocalSite.
00043  */
00044 
00045 multimap<string, metaobject_extender_t> LocalSite::localObjectExtensionTable;
00046 
00047 LocalSite::LocalSite()
00048     : VobjectImplementation("", 0, true), LocalVobject("", 0, 0),
00049       LocalMetaObject("", 0, 0), MetaObject(0), selectwait(new struct timeval()),
00050       callbacklock(false), notifyflushlock(false)
00051 {
00052     setGreeted(true);
00053 }
00054 
00055 LocalSite::~LocalSite()
00056 {
00057 }
00058 
00059 void LocalSite::scheduleMessageBlock(MessageBlock* mb, MessageContext* mc, Site* ss,
00060                                      double plustime, const char* extradependency)
00061 {
00062     if(mb->getName() != "" && mc == 0) {
00063         ss->addMessageBlock(mb);
00064     } else {
00065         if(mb->numMessages() > 1 && !mc) {
00066             /* note: So as to prevent outgoing message traps from
00067                persisting for the entire session defaultContext is NOT
00068                a parent context.  It is used only for the special
00069                cases of "to", "from" and "method".
00070             */
00071             mc = new MessageContext(); // released: after the for loop
00072             try {
00073                 mc->setParameter("to", ss->defaultContext.getParameter("to"));
00074                 mc->setParameter("from", ss->defaultContext.getParameter("from"));
00075                 mc->setParameter("method", ss->defaultContext.getParameter("method"));
00076             } catch(MessageContext::NoSuchParameterError x) {
00077                 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00078             }
00079         }
00080         else if(mc) mc->acquire(); // released: after the for loop
00081         for(int j=0; j < mb->numMessages(); j++) {
00082             vRef<Message> m = mb->getMessage(j);
00083 
00084             if(m->getType() == "default") {
00085                 if(mc) {
00086                     if(m->hasMethod()) mc->setParameter("method", m->getMethod());
00087                     if(m->hasTo()) mc->setParameter("to", m->getTo());
00088                     if(m->hasFrom()) mc->setParameter("from", m->getFrom());
00089                 } else {
00090                     if(m->hasMethod()) ss->defaultContext.setParameter("method", m->getMethod());
00091                     if(m->hasTo()) ss->defaultContext.setParameter("to", m->getTo());
00092                     if(m->hasFrom()) ss->defaultContext.setParameter("from", m->getFrom());
00093                 }
00094             } else if(m->getType() == "include") {
00095                 try {
00096                     vRef<MessageBlock> mb = ss->getMessageBlock(m->getMethod());
00097                     vRef<MessageContext> newmc = new MessageContext();
00098                     newmc->setParentContext(mc);
00099                     try {
00100                         if(m->hasTo()) newmc->setParameter("to", m->getTo());
00101                         else newmc->setParameter("to", ss->defaultContext.getParameter("to"));
00102 
00103                         if(m->hasFrom()) newmc->setParameter("from", m->getFrom());
00104                         else newmc->setParameter("from", ss->defaultContext.getParameter("from"));
00105 
00106                         if(m->hasMethod()) newmc->setParameter("method", m->getMethod());
00107                         else newmc->setParameter("method", ss->defaultContext.getParameter("method"));
00108                     } catch(MessageContext::NoSuchParameterError x) {
00109                         LOG("localsite", 3, "scheduleMessageBlock Error " << x.what());
00110                     }
00111                     for(int i = 0; i < m->getNumFields(); i++) {
00112                         const Message::Field& f = m->getField(i);
00113                         newmc->setParameter(f.key, f.source);
00114                     }
00115                     string extradep = m->getDependency();
00116                     mc->doSubstitution(extradep);
00117                     scheduleMessageBlock(&mb, &newmc, ss, plustime + m->getTime(), extradep.c_str());
00118                 } catch(NoSuchMessageBlockError) { }
00119             } else {
00120                 vRef<Message> mcopy = (mb->getName() == "" && mc==0) ? (m->acquire(), &m) : (new Message(*m));
00121                 if(mcopy != m && mcopy->getCount() != 1) LOG("refcount", 5, "count on new (copied) message is "
00122                                                              << mcopy->getCount());
00123                 if(extradependency) {
00124                     if(mcopy->getDependency() == "") mcopy->setDependency(extradependency);
00125                     else {
00126                         string s = mcopy->getDependency();
00127                         s += ",";
00128                         s += extradependency;
00129                         mcopy->setDependency(s);
00130                     }
00131                 }
00132                 mcopy->setSourceSite(ss);
00133                 mcopy->setMessageContext(mc);
00134                 if(mcopy->getDependency() != "") {
00135                     ss->trapOutgoingReply(&mcopy);
00136                 }
00137                 if(mc) {
00138                     try {
00139                         if(! mcopy->hasMethod()) mcopy->setMethod(mc->getParameter("method"));
00140                         if(! mcopy->hasTo()) mcopy->setTo(mc->getParameter("to"));
00141                         if(! mcopy->hasFrom()) mcopy->setFrom(mc->getParameter("from"));
00142                     } catch(MessageContext::NoSuchParameterError x) {
00143                         LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00144                     }
00145                 } else {
00146                     try {
00147                         if(! mcopy->hasMethod()) mcopy->setMethod(ss->defaultContext.getParameter("method"));
00148                         if(! mcopy->hasTo()) mcopy->setTo(ss->defaultContext.getParameter("to"));
00149                         if(! mcopy->hasFrom()) mcopy->setFrom(ss->defaultContext.getParameter("from"));
00150                     } catch(MessageContext::NoSuchParameterError x) {
00151                         LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00152                     }
00153                 }
00154 
00155                 double now = getTimer();
00156 
00157 //    LOG("localsite", 3, "entered scheduleMessageBlock " << tv.tv_sec << "." << tv.tv_usec);
00158 
00159                 LOG("localsite", 4, "scheduling incoming message " << mcopy->incoming_debug);
00160                 mcopy->acquire(); // released by: runSchedule()
00161                 siteMessageQueue[now + plustime + mcopy->getTime()].push_back(&mcopy);
00162             }
00163         }
00164         if(mc) mc->release(); // acquired by: new MessageContext() or mc->acquire() before the for loop
00165     }
00166 }
00167 
00168 void LocalSite::addCallback(callback_t callback, void* userdata, double timeout, bool repeat)
00169 {
00170     double now = getTimer();
00171 
00172     for(map<double, Callback>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00173         if((*i).second.callback == callback) {
00174             (*i).second.userdata = userdata;
00175             (*i).second.timeout = timeout;
00176             (*i).second.repeat = repeat;
00177             return;
00178         } else i++;
00179     }
00180 
00181     if(! callbacklock) {
00182         callbacks[now + timeout].callback = callback;
00183         callbacks[now + timeout].userdata = userdata;
00184         callbacks[now + timeout].timeout = timeout;
00185         callbacks[now + timeout].repeat = repeat;
00186     } else {
00187         pair<double, Callback> p;
00188         p.first = now + timeout;
00189         p.second.callback = callback;
00190         p.second.userdata = userdata;
00191         p.second.timeout = timeout;
00192         p.second.repeat = repeat;
00193         callbackaddqueue.push(p);
00194     }
00195 }
00196 
00197 void LocalSite::removeCallback(callback_t callback)
00198 {
00199     if(! callbacklock) {
00200         for(map<double, Callback>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00201             if((*i).second.callback == callback) {
00202                 map<double, Callback>::iterator tmp = i;
00203                 i++;
00204                 callbacks.erase(tmp);
00205             } else i++;
00206         }
00207     } else {
00208         callbackremovequeue.push(callback);
00209     }
00210 }
00211 
00212 
00213 void LocalSite::doCallbacks()
00214 {
00215     if(! callbacklock) { // callback could call runSchedule()
00216         callbacklock = true;
00217 
00218         double now = getTimer();
00219 
00220         LOG("localsite", 4, "Running callbacks (" << callbacks.size() << ")");
00221 
00222         for(map<double, Callback>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00223             if((*i).first <= now) {
00224                 try {
00225                     (*i).second.callback((*i).second.userdata);
00226                 } catch(exception e) {
00227                     LOG("localsite", 1, "Callback emitted exception: " << e.what());
00228                 } catch(...) {
00229                     LOG("localsite", 1, "Callback emitted unknown exception!");
00230                 }
00231 
00232                 if((*i).second.repeat) {
00233                     pair<double, Callback> p;
00234                     p.first = now + (*i).second.timeout;
00235                     p.second = (*i).second;
00236                     callbackaddqueue.push(p);
00237                     LOG("localsite", 5, "resetting callback to trigger in " << (*i).second.timeout << " seconds");
00238                 }
00239 
00240                 map<double, Callback>::iterator tmp = i;
00241                 i++;
00242                 callbacks.erase(tmp);
00243             } else break;
00244         }
00245 
00246         callbacklock = false;
00247 
00248         while(! callbackaddqueue.empty())
00249         {
00250             callbacks[callbackaddqueue.front().first] = callbackaddqueue.front().second;
00251             callbackaddqueue.pop();
00252         }
00253 
00254         while(! callbackremovequeue.empty())
00255         {
00256             removeCallback(callbackremovequeue.front());
00257         }
00258 
00259         LOG("localsite", 4, "Done with callbacks");
00260     }
00261 }
00262 
00263 void LocalSite::runSchedule()
00264 {
00265     double now = getTimer();
00266 
00267     LOG("localsite", 5, "Running schedule");
00268 
00269     deque<Message*> prioritizedQueue;
00270     bool inc_i;
00271     for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin(); i != siteMessageQueue.end(); inc_i ? i++ : 0) {
00272         inc_i = true;
00273         if((*i).first <= now) {
00274             bool inc_q;
00275             for(list<Message*>::iterator q = (*i).second.begin(); q != (*i).second.end(); inc_q ? q++ : 0) {
00276                 inc_q = true;
00277                 MessageContext* mc = (*q)->getMessageContext(); // released by: mc->release() below
00278                 bool depsatisfied=true;
00279                 string dep = (*q)->getDependency();
00280                 if((*q)->getDependency() != "" && mc) {
00281                     unsigned int i=0;
00282                     unsigned int n=0;
00283                     while(i < dep.size()) {
00284                         while(i < dep.size() && dep[i] != ',') i++;
00285                         LOG("localsite", 5, "--- 0 depsatisfied " << depsatisfied << " ---");
00286                         LOG("localsite", 5, "dep.substr: " << dep.substr(n, i-n));
00287                         depsatisfied &= mc->replyRecordContains(dep.substr(n, i-n));
00288                         LOG("localsite", 5, "1 depsatisfied " << depsatisfied);
00289                         n=++i;
00290                     }
00291                 }
00292                 LOG("localsite", 5, "2 depsatisfied " << depsatisfied);
00293                 bool removeQ = false;
00294                 if(depsatisfied) {
00295                     map<string, RemoteSite*>::iterator r;
00296                     vRef<Site> s = (*q)->getSourceSite();
00297                     if(&s && (s->getGreeted() || ((*q)->getMethod() == "core:hello" && (*q)->getType() == "message")))
00298                     {
00299                         URL to((*q)->getTo());
00300                         if(! pendingValidationHosts.count(to.getHostAndPort())) {
00301                             r = prioritized.find((*q)->getNonce());
00302                             if(r != prioritized.end() && (*r).second == &s) prioritizedQueue.push_back(*q);
00303                             else msgsReady.push_back(*q);
00304 
00305                             LOG("localsite", 4, "adding msg " << (*q)->incoming_debug << " to schedule with time");
00306                             //fprintf(stderr, "%16.16f\n", (*i).first);
00307                         }
00308 
00309                         removeQ = true;
00310 
00311                     } else {
00312                         LOG("localsite", 4, "can't deliver mesage " << (*q)->getMethod() << " until source site greets us");
00313                         if(!s->isConnected() && !s->getGreeted()) removeQ = true;
00314                     }
00315                 } else {
00316                     vRef<Site> s = (*q)->getSourceSite();
00317                     if(! s->isConnected()) removeQ = true;
00318                 }
00319                 if(removeQ) {
00320                     list<Message*>::iterator old = q;
00321                     q++;
00322                     inc_q = false;
00323                     (*i).second.erase(old);
00324                     if(q == (*i).second.end()) {
00325                         if(mc) mc->release(); // acquired by: getMessageContext() above
00326                         break;
00327                     }
00328                 }
00329                 if(mc) mc->release(); // acquired by: getMessageContext() above
00330             }
00331             if((*i).second.size() == 0) {
00332                 map<double, list<Message*> >::iterator old = i;
00333                 i++;
00334                 inc_i = false;
00335                 siteMessageQueue.erase(old);
00336                 if(i == siteMessageQueue.end()) break;
00337             }
00338         } else break;
00339     }
00340 
00341     LOG("localsite", 4, "Done building schedule (" << msgsReady.size() << ", " << prioritizedQueue.size() << ")");
00342 
00343     if(msgsReady.empty() && prioritizedQueue.empty()) {
00344         LOG("localsite", 5, "Done running schedule");
00345 
00346         flushNotifications();
00347         doCallbacks();
00348 
00349         return;
00350     }
00351 
00352     /*
00353     while(! prioritizedQueue.empty()) {
00354         vRef<Message> m = prioritizedQueue.front();
00355         prioritizedQueue.pop_front();
00356         vRef<Site> s = m->getSourceSite();
00357         if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00358         try {
00359             sendMessage(&m);
00360         } catch(runtime_error e) {
00361             LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00362         } catch(...) {
00363             LOG("localsite", 2, "sendMessage emitted unknown exception");
00364         }
00365         if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00366             Site::removeSite(*s);
00367         }
00368     }
00369     */
00370 
00371     while(! msgsReady.empty()) {
00372         vRef<Message> m = msgsReady.front();
00373         msgsReady.pop_front();
00374         vRef<Site> s = m->getSourceSite();
00375         if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00376         LOG("msgrefcount", 5, "about to send message number " << m->refcount_debug << " with count " << m->getCount());
00377 
00378         for(int i = 0; i < messagefilters.size(); i++) {
00379             if(! messagefilters[i]->checkMessage(&m)) {
00380                 LOG("localsite", 4, "Dropping filtered message: " << m->getLoggableString());
00381                 return;
00382             }
00383         }
00384 
00385         try {
00386             sendMessage(&m);
00387         } catch(runtime_error e) {
00388             LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00389         } catch(...) {
00390             LOG("localsite", 2, "sendMessage emitted unknown exception");
00391         }
00392 
00393         if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00394             Site::removeSite(*s);
00395         }
00396     }
00397 
00398     runSchedule();
00399 }
00400 
00401 void LocalSite::removeRemotePeer(RemoteSite* rs)
00402 {
00403     if(iteratorsUsingPeerSites == 0) {
00404         map<RemoteSite*, SiteTableEntry*>::iterator i = peerSites.find(rs);
00405         if(i != peerSites.end()) {
00406             LOG("refcount", 5, "localsite: removing remote peer, count is " << (*i).first->getCount());
00407             (*i).first->release(); // acquired by: doSitePeering()
00408             if((*i).second->partialMessage)
00409                 (*i).second->partialMessage->release(); // acquired by: flushIncomingBuffers()
00410             delete (*i).second;
00411             peerSites.erase(i);
00412         }
00413     } else {
00414         boost::mutex::scoped_lock lock(peerSitesBuffer_mutex);
00415         peerSitesBuffer_remove.push_back(rs);
00416     }
00417 }
00418 
00419 bool LocalSite::checkScheduleHoldsSite(Site* site)
00420 {
00421     for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00422         i != siteMessageQueue.end(); i++)
00423     {
00424         for(list<Message*>::iterator n = (*i).second.begin();
00425             n != (*i).second.end(); n++)
00426         {
00427             bool ret = false;
00428             pREF(Site*, s, (*n)->getSourceSite(),
00429                  if(s == site) ret = true;
00430                  );
00431             if(ret) return true;
00432         }
00433     }
00434     return false;
00435 }
00436 
00437 void LocalSite::sendMessage(MessageBlock* mb)
00438 {
00439     Site::sendMessage(mb);
00440     LocalVobject::sendMessage(mb);
00441 }
00442 
00443 void LocalSite::verifyCheckIDPair(RemoteSocketSite* rss, const string& spoofID)
00444 {
00445     vRef<Message> nm = new Message();
00446     nm->setType("message");
00447     nm->setTo(rss->getURL().getString());
00448     nm->setMethod("core:anti-spoof-check");
00449     nm->generateNonce();
00450     nm->insertField(-1, "peek", spoofID);
00451     rss->sendMessage(&nm);
00452 }
00453 
00454 void LocalSite::sendMessage(Message* m)
00455 {
00456     /*static int nextexpected = 1;
00457     if(m->incoming_debug != nextexpected) {
00458         LOG("localsite", 1, "Inconsistant incoming!  Expected " << nextexpected << " but got " << m->incoming_debug);
00459         nextexpected = m->incoming_debug + 1;
00460         } else nextexpected++;*/
00461 
00462     vRef<Site> sourcesite = m->getSourceSite();
00463     LOG("LocalSite::sendMessage", 3, "Incoming message: " << m->getMethod() << " (to site " << getURL().getString()
00464         << "  from " << (sourcesite==0 ? "NULL" : sourcesite->getURL().getString()) << ")");
00465     LOG("LocalSite::sendMessage", 4, "<<<(incoming)<<< sending to " << getURL().getString() << " [" << m->refcount_debug
00466         << "/" << m->incoming_debug << "/" << m->getCount() << "]\n" << m->getLoggableString());
00467 
00468     Site::sendMessage(m);
00469 
00470     if(m->getType() == "message") {
00471         if(m->getMethod() == "core:hello") {
00472             vRef<RemoteSocketSite> st = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00473             LOG("refcount", 5, "localsite sendMessage: got handshake, refcount on site is " << st->getCount());
00474             string checkid;
00475             bool foundFavorite = false;
00476             bool shouldGreet = true;
00477             for(int i=0; i < m->getNumFields(); i++) {
00478                 const Message::Field& a=m->getField(i);
00479                 if(a.key == "antispoof") {
00480                     checkid = a.value;
00481                     setAntiSpoofIDMapping(checkid, st->getAntiSpoofID());
00482                 }
00483                 if(a.key == "alias" && !hasHostAlias(a.value) && !st->checkFlag("spoof-test")) {
00484                     LOG("localsite", 3, "Validating alias " << a.value);
00485                     bool checkit = false;
00486                     if(! getAllSites().count(a.value)) checkit = true;
00487                     else {
00488                         vRef<RemoteSocketSite> existingsite = meta_cast<RemoteSocketSite&>(findSite(a.value));
00489                         if(!(*st == *existingsite)) {
00490                             if(! existingsite->isConnected()) {
00491                                 removeSite(*existingsite);
00492                                 checkit = true;
00493                             }
00494                             LOG("localsite", 2, "We have a multiconnection.  Verifying...");
00495                             LOG("localsite", 2, "May need to bind site " << st->getURL().getString()
00496                                 << " to site " << a.value);
00497                             // do something?
00498                         }
00499                     }
00500                     if(checkit) {
00501                         bool isok = false;
00502                         if(checkid != "") {
00503                             string host;
00504                             int port;
00505                             unsigned int i;
00506                             for(i = 0; i < a.value.size() && a.value[i] != ':'; i++);
00507                             host = a.value.substr(0, i);
00508                             if(i < a.value.size()) port = atoi(a.value.substr(i+1).c_str());
00509                             else port = VOS_DEFAULT_PORT;
00510                             try {
00511                                 vRef<RemoteSocketSite> rss = AsyncConnect::connect(host, port, this, true);
00512 
00513                                 rss->removeHostAlias(a.value);
00514                                 st->addHostAlias(a.value);
00515 
00516                                 // the AntiSpoofID is the nonce *we* sent with core:hello
00517                                 verifyCheckIDPair(&rss, st->getAntiSpoofID());
00518                                 st->suppressOutgoing(true);
00519 
00520                                 rss->acquire(); // released by: handler for anti-spoof-reply
00521                                 st->acquire(); // released by:  handler for anti-spoof-reply
00522                                 pendingValidations[&rss].realsite = st;
00523                                 pendingValidations[&rss].checkid = checkid;
00524                                 pendingValidations[&rss].hostalias = a.value;
00525                                 pendingValidations[&rss].favored = ! foundFavorite;
00526                                 pendingValidationHosts.insert(a.value);
00527                                 foundFavorite = true;
00528                                 shouldGreet = false;
00529                             } catch(RemoteSite::SiteConnectionError) {
00530                                 LOG("localsite", 1, "Tried to verify '" << a.value << "' but could not connect.");
00531                                 vRef<Message> err = new Message();
00532                                 err->setType("message");
00533                                 err->setMethod("core:error");
00534                                 err->insertField(-1, "error", "Could not verify your hostname '"
00535                                                  + a.value + "'");
00536                                 st->sendMessage(&err);
00537                             }
00538                         }
00539                     } else {
00540                         if(st->getURL().getHostAndPort() != a.value) {
00541                             LOG("localsite", 1, "Attempted spoof! '" << st->getURL().getString() << "' claimed to be '"
00542                                 << a.value << "' but failed verification!");
00543                         }
00544                     }
00545                 }
00546             }
00547             st->setGreeted(shouldGreet);
00548             LOG("refcount", 5, "localsite sendMessage: done with hello, refcount on site is " << st->getCount());
00549         } else if(m->getMethod() == "core:anti-spoof-check") {
00550             try {
00551                 vRef<RemoteSite> st = dynamic_cast<RemoteSite*>(m->getSourceSite());
00552                 const Message::Field& f = m->getField("peek");
00553                 string spid = getAntiSpoofIDMapping(f.value);
00554                 if(spid == "") {
00555                     st->acquire(); // released by: doSitePeering
00556                     needSpoofIDreply.push_back(pair<string, RemoteSite*>(f.value, &st));
00557                 } else {
00558                     vRef<Message> reply = new Message();
00559                     initReply(this, &reply, m, "core:anti-spoof-reply");
00560                     reply->setFrom("");
00561                     reply->setType("message");
00562                     reply->insertField(-1, "poke", spid);
00563                     st->sendMessage(&reply);
00564                 }
00565             } catch(...) { }
00566         } else if(m->getMethod() == "core:anti-spoof-reply") {
00567             vRef<RemoteSocketSite> rss = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00568             if(pendingValidations.count(&rss)) {
00569                 string hostalias = pendingValidations[&rss].hostalias;
00570                 vRef<RemoteSocketSite> st = pendingValidations[&rss].realsite;
00571 
00572                 bool isok=false;
00573                 try {
00574                     if(m->getField("poke").value == pendingValidations[&rss].checkid) isok = true;
00575                 } catch(Message::NoSuchFieldError) {
00576                 }
00577 
00578                 rss->handleDisconnection();
00579                 if(isok) {
00580                     LOG("localsite", 3, "Adding verified host alias " << hostalias
00581                         << " to " << st->getURL().getString());
00582                     pendingValidationHosts.erase(hostalias);
00583                     if(pendingValidations[&rss].favored) {
00584                         URL u = st->getURL();
00585                         u.setHostAndPort(hostalias);
00586                         st->setURL(u);
00587                         LOG("localsite", 3, "New preferred alias: " << st->getURL().getString());
00588                     }
00589                     st->setGreeted(true);
00590                     st->enableOutgoing(true, st->getURL().getHostAndPort());
00591                 } else {
00592                     LOG("localsite", 1, "Attempted spoof?  Was given " << hostalias
00593                         << " by " << st->getURL().getString() << " but could not verify");
00594                     vRef<Message> err = new Message();
00595                     err->setType("message");
00596                     err->setMethod("core:error");
00597                     err->insertField(-1, "error", "Could not verify your hostname '" + hostalias
00598                                      + "' (I'm not going to talk to you. Sorry).");
00599                     st->sendMessage(&err);
00600                     st->removeHostAlias(hostalias);
00601                 }
00602                 pendingValidations.erase(&rss);
00603                 rss->release(); // acquired by: handler for core:hello
00604             }
00605             return; // no reason to process this message any more
00606         } else if(m->getMethod() == "core:error") {
00607             try {
00608                 LOG("localsite", 1, "Got error: " << m->getField("error").value);
00609             } catch(Message::NoSuchFieldError) { }
00610         } else if(m->getMethod() == "core:create-object") {
00611             vRef<Message> reply = new Message();
00612             initReply(this, &reply, m, "core:set-child-update");
00613             string newobjname;
00614             deque<string> newobjtypes;
00615             try {
00616                 for(int i=0; i < m->getNumFields(); i++) {
00617                     const Message::Field& f = m->getField(i);
00618                     if(f.key == "type") {
00619                         newobjtypes.push_back(f.value);
00620                     } else if(f.key == "name") {
00621                         newobjname = f.value;
00622                     }
00623                 }
00624             } catch(Message::NoSuchFieldError) { }
00625             SiteAccessControl* sac;
00626             vRef<Site> ss = m->getSourceSite();
00627             if((sac = dynamic_cast<SiteAccessControl*>(getAccessControl()))
00628                && sac->checkCreateVobjectPermission(*ss, *this, newobjname, newobjtypes)) {
00629                 vRef<LocalMetaObject> newobj =
00630                     meta_cast<LocalMetaObject*>(createMetaObject(newobjname.c_str(), (char*)0));
00631 
00632                 for(deque<string>::iterator i = newobjtypes.begin();
00633                     i != newobjtypes.end(); i++)
00634                 {
00635                     if(getAccessControl()->checkAddTypePermission(*ss, *i))
00636                         newobj->addType(*i, &ss);
00637                 }
00638                 vRef<ParentChildRelation> pcr = newobj->findParent(*this);
00639                 char n[16];
00640                 snprintf(n, sizeof(n), "%i", pcr->position);
00641                 reply->insertField(-1, "pos", n);
00642                 reply->insertField(-1, "name", pcr->contextual_name);
00643                 reply->insertField(-1, "path", newobj->getURL().getString());
00644                 const TypeSet& ts = newobj->getTypes();
00645                 if(ts.size() > 0) {
00646                     for(TypeSet::const_iterator ti = ts.begin(); ti != ts.end(); ti++) {
00647                         reply->insertField(-1, "type", *ti);
00648                     }
00649                 } else reply->insertField(-1, "type", "");
00650                 try {
00651                     ss->sendMessage(&reply);
00652                 } catch(...) { }
00653 
00654                 LOG("refcounted", 5, "Done creating meta object, count (after release) is " << (newobj->getCount()-1));
00655             } else {
00656                 if(!sac) LOG("accesscontrol", 2, "Warning!  " << getURL().getString()
00657                              << " has no access control policy!  Failing by default.");
00658                 reply->insertField(-1, "error", "Permission denied");
00659             }
00660         }
00661     }
00662 
00663     if(&sourcesite && sourcesite->checkFlag("spoof-test")) return;
00664 
00665     try {
00666         URL to(m->getTo());
00667         URL from(m->getFrom());
00668 
00669         if(! getAllSites().count(to.getHostAndPort())) {
00670             LOG("localsite", 3, "dropping message with unknown to field " << to.getString());
00671             return;
00672         }
00673 
00674         map<string, Site*>::const_iterator sti = getAllSites().find(from.getHostAndPort());
00675         vRef<Site> fromsite;
00676         if(sti != getAllSites().end()) {
00677             fromsite = (*sti).second;
00678             fromsite->acquire();
00679         }
00680         if(! &fromsite) {
00681             LOG("localsite", 3, "dropping message with unknown from field " << from.getString());
00682             return;
00683         }
00684 
00685         LOG("refcount", 5, "localsite sendMessage: fromsite refcount is " << fromsite->getCount());
00686         vRef<Site> st = m->getSourceSite();
00687         if(! &st) {
00688             LOG("localsite", 3, "dropping message with no sourcesite!");
00689             return;
00690         }
00691 
00692         if(&fromsite != &st && st->isRemote()) {
00693             if(!st->checkFlag("spoof-test") && m->getMethod() != "core:type-add-update") {
00694                 LOG("sendMessage", 2, "Note: site in from field \"" << fromsite->getURL().getString()
00695                     << "\" and actual source site \"" << st->getURL().getString()
00696                     << "\" are not the same object");
00697                 LOG("sendMessage", 3, "error happened on message " << m->getLoggableString());
00698             }
00699             return;
00700         }
00701         if(peerSites.count(dynamic_cast<RemoteSite*>(&fromsite))) {
00702             if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == "") {
00703                 peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName = to.getHostAndPort();
00704             }
00705             if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == to.getHostAndPort()) {
00706                 to.setHostAndPort(getURL().getHostAndPort());
00707                 m->setTo(to.getString());
00708             }
00709         }
00710         vRef<Vobject> vob = Vobject::findObjectFromRoot(m->getTo());
00711         if(!(dynamic_cast<LocalSite*>(&vob) == this)) {
00712             try {
00713                 vob->sendMessage(m);
00714             } catch(runtime_error e) {
00715                 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00716                 return;
00717             } catch(...) {
00718                 LOG("localsite", 2, "sendMessage emitted unknown exception");
00719                 return;
00720             }
00721         }
00722     } catch(NoSuchSiteError) {
00723         LOG("localsite", 3, "No such site on to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\")  "
00724             << m->getLoggableString());
00725         return;
00726     } catch(NoSuchObjectError) {
00727         LOG("localsite", 3, "No such to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") object");
00728         return;
00729     } catch(URL::BadURLError) {
00730         LOG("localsite", 3, "Bad to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") field");
00731         return;
00732     }
00733 
00734     if(m->getType() == "update") {
00735         try {
00736             vRef<Vobject> target = Vobject::findObjectFromRoot(m->getFrom());
00737             LOG("refcount", 5, "about to deliver to update Message, count on msg is " << m->getCount());
00738             target->sendUpdateMessage(m);
00739             LOG("refcount", 5, "did deliver to updateMessage, count on msg is " << m->getCount());
00740         } catch(NoSuchObjectError) {
00741             return;
00742         } catch(runtime_error e) {
00743             LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00744             return;
00745         } catch(...) {
00746             LOG("localsite", 2, "sendMessage emitted unknown exception");
00747             return;
00748         }
00749     }
00750 
00751     LocalMetaObject::sendMessage(m);
00752 
00753     LOG("localsite", 4, "did deliver to sendMessage, count on msg is " << m->getCount());
00754 }
00755 
00756 void LocalSite::setURL(const URL& u)
00757 {
00758     url = u;
00759 }
00760 
00761 void LocalSite::setTimeoutOnSelect(double sec)
00762 {
00763     delete selectwait;
00764     if(sec < 0) selectwait = 0;
00765     else {
00766         selectwait = new struct timeval();
00767         selectwait->tv_sec = (long)floor(sec);
00768         selectwait->tv_usec = (long)((sec - floor(sec))*1000000.0);
00769     }
00770 }
00771 
00772 double LocalSite::getTimeoutOnSelect()
00773 {
00774     if(selectwait) return (((double)selectwait->tv_sec) + (((double)selectwait->tv_usec) / 1000000.0));
00775     else return -1;
00776 }
00777 
00778 void LocalSite::addPrioritizedNonce(const string& nonce, RemoteSite* source)
00779 {
00780     source->acquire(); // released by: removePrioritizedNonce()
00781     LOG("refcount", 5, "addPrioritizedNonce for " << nonce << " acquired " << source->getURL().getString()
00782         << " count is now " << source->getCount());
00783     prioritized[nonce] = source;
00784 }
00785 
00786 void LocalSite::removePrioritizedNonce(const string& nonce, RemoteSite* source)
00787 {
00788     map<string, RemoteSite*>::iterator i = prioritized.find(nonce);
00789     if(i != prioritized.end() && (*i).second == source) {
00790         prioritized.erase(i);
00791         LOG("refcount", 5, "removePrioritizedNonce released " << source->getURL().getString()
00792             << " count is now " << (source->getCount()-1));
00793         source->release(); // acquired by: addPrioritizedNonce()
00794     }
00795 }
00796 
00797 
00798 void LocalSite::extendMetaObject(LocalMetaObject* root, const char* t) {
00799     if(localObjectExtensionTable.count(t) == 0) {
00800         root->addType(t);
00801     } else {
00802         pair<MI, MI> range = localObjectExtensionTable.equal_range(t); // get iterator range of types matching t
00803         for(MI it = range.first; it != range.second; it++) {
00804             MetaObject* newext = (*it).second(root, t);
00805             root->addType(newext->getType()); // add all the metaobjects for type t
00806         }
00807     }
00808 }
00809 
00810 
00811 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const deque<string>& typelist)
00812 {
00813     string name = uniqueName(desiredName);
00814     if(!ac) ac = getAccessControl();
00815     LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00816     LocalVobject::insertChild(-1, root->getName(), root);
00817     for(deque<string>::const_iterator i = typelist.begin(); i != typelist.end() ; i++) {
00818         extendMetaObject(root, (*i).c_str());
00819     }
00820     return root;
00821 }
00822 
00823 MetaObject* LocalSite::createMetaObject(const char* name, const deque<string>& typelist) {
00824     return createMetaObject(name, 0, typelist);
00825 }
00826 
00827 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const char* firstType, ...)
00828 {
00829     string name = uniqueName(desiredName);
00830     if(!ac)
00831         ac = getAccessControl();
00832     LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00833     LocalVobject::insertChild(-1, root->getName(), root);
00834     if(!firstType)
00835         return root;
00836     extendMetaObject(root, firstType);
00837     va_list ap;
00838     va_start(ap, firstType);
00839     for(;;) {
00840         char* p=va_arg(ap, char*);
00841         if(p == 0) break;
00842         extendMetaObject(root, p);
00843     }
00844     va_end(ap);
00845     return root;
00846 }
00847 
00848 
00849 MetaObject* LocalSite::createMetaObject(const char* desiredName, const char* first, ...)
00850 {
00851     string name = uniqueName(desiredName);
00852     LocalMetaObject* root = new LocalMetaObject(name, this, getAccessControl());
00853     LocalVobject::insertChild(-1, root->getName(), root);
00854     if(!first)
00855         return root;
00856     extendMetaObject(root, first);
00857     va_list ap;
00858     va_start(ap, first);
00859     for(;;) {
00860         char* t = va_arg(ap, char*);
00861         if(t == 0)
00862             break;
00863         extendMetaObject(root, t);
00864     }
00865     va_end(ap);
00866     return root;
00867 }
00868 
00869 
00870 void LocalSite::addLocalObjectExtension(const char* type, metaobject_extender_t newmethod)
00871 {
00872     localObjectExtensionTable.insert(multimap<string, metaobject_extender_t>::value_type(string(type), newmethod));
00873 }
00874 
00875 bool LocalSite::hasLocalObjectExtension(const char* type)
00876 {
00877     return (localObjectExtensionTable.count(type) > 0);
00878 }
00879 
00880 void LocalSite::removeLocalObjectExtension(const char* type, metaobject_extender_t oldmethod)
00881 {
00882     pair<MI, MI> g = localObjectExtensionTable.equal_range(type);
00883     for(MI it = g.first; it != g.second; it++) {
00884         if((*it).second == oldmethod) {
00885             localObjectExtensionTable.erase(it);
00886             break;
00887         }
00888     }
00889 }
00890 
00891 void LocalSite::printExtensionTable(ostream& stream) {
00892     for(MI i = localObjectExtensionTable.begin(); i != localObjectExtensionTable.end(); i++) {
00893         stream << i->first << endl;
00894     }
00895 }
00896 
00897 void LocalSite::addNotification(NotifyEvent* ev)
00898 {
00899     notifyEvents.push(ev);
00900 }
00901 
00902 void LocalSite::lockNotificationFlush()
00903 {
00904     notifyflushlock = true;
00905 }
00906 
00907 void LocalSite::unlockNotificationFlush()
00908 {
00909     notifyflushlock = false;
00910     flushNotifications();
00911 }
00912 
00913 void LocalSite::flushNotifications()
00914 {
00915     if(! notifyflushlock) {
00916         LOG("localsite", 5, "Flushing notifications (queue size is " << notifyEvents.size() << ")");
00917         while(! notifyEvents.empty()) {
00918             NotifyEvent* n = notifyEvents.front();
00919             notifyEvents.pop();
00920             try {
00921                 n->notify();
00922             } catch(runtime_error x) {
00923                 LOG("localsite", 0, "call to notify emitted exception: " << x.what());
00924             } catch(...) { }
00925             delete n;
00926         }
00927         LOG("localsite", 5, "Done flushing notifications");
00928     }
00929 }
00930 
00931 
00932 void LocalSite::insertChild(int position, const string& contextual_name, Vobject* child)
00933     throw (AccessControlError, RemoteError)
00934 {
00935     throw AccessControlError("Impossible to do insertChild on a site, try createObject");
00936 }
00937 
00938 void LocalSite::setChild(int position, const string& contextual_name, Vobject* child)
00939     throw (AccessControlError, RemoteError)
00940 {
00941     throw AccessControlError("Impossible to do setChild on a site, try createObject");
00942 }
00943 
00944 void LocalSite::removeChild(int position)
00945     throw (AccessControlError, RemoteError)
00946 {
00947     throw AccessControlError("Impossible to do removeChild on a site, try excise");
00948 }
00949 
00950 void LocalSite::takeOverMessages(Site* was, Site* now)
00951 {
00952     for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00953         i != siteMessageQueue.end();
00954         i++)
00955     {
00956         for(list<Message*>::iterator n = (*i).second.begin();
00957             n != (*i).second.end();
00958             n++)
00959         {
00960             vRef<Site> s = (*n)->getSourceSite();
00961             if(&s == was) (*n)->setSourceSite(now);
00962         }
00963     }
00964 }
00965 
00966 // There is a tiny, tiny chance that somehow the main thread
00967 // shrink the child list right in the middle of this loop,
00968 // which would cause the iterator to run off the end.
00969 // This seems like a fairly infrequent race condition,
00970 // and one which is incredibly difficult to fix
00971 // due to the fact that it would require locking
00972 // the child list (which is used everywhere).
00973 // Of course, this loop should only ever run once anyway...
00974 void LocalSite::setPrimaryHostname(string s)
00975 {
00976     if(s != url.getHost()) {
00977         url.setHost(s);
00978         addHostAlias(url.getHostAndPort());
00979         for(unsigned int i = 0; i < children.size(); i++) {
00980             if(LocalVobject* v = dynamic_cast<LocalVobject*>(children[i]->child)) {
00981                 URL u = v->getURL();
00982                 u.setHost(s);
00983                 v->setURL(u);
00984             }
00985         }
00986     }
00987 }

Generated on Tue Aug 12 03:55:40 2003 for Interreality Project - VOS by doxygen 1.3.2