00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "localsite.hh"
00025 #include "accesscontrol.hh"
00026 #include "remotesocketsite.hh"
00027 #include "message.hh"
00028 #include "messageblock.hh"
00029 #include "timer.hh"
00030
00031 using namespace VOS;
00032
00033 #include <sys/types.h>
00034 #include <fcntl.h>
00035 #include <stdio.h>
00036 #include <errno.h>
00037 #include <math.h>
00038 #include <cstdarg>
00039 #include <algorithm>
00040 #include <typeinfo>
00041
00042 multimap<string, metaobject_extender_t> LocalSite::localObjectExtensionTable;
00043
00044 LocalSite::LocalSite()
00045 : VobjectImplementation("", 0, true), LocalVobject("", 0, 0),
00046 LocalMetaObject("", 0, 0), MetaObject(0), selectwait(new struct timeval()),
00047 callbacklock(false), notifyflushlock(false)
00048 {
00049 setGreeted(true);
00050 }
00051
00052 LocalSite::~LocalSite()
00053 {
00054 }
00055
00056 void LocalSite::scheduleMessageBlock(MessageBlock* mb, MessageContext* mc, Site* ss,
00057 double plustime, const char* extradependency)
00058 {
00059 if(mb->getName() != "" && mc == 0) {
00060 ss->addMessageBlock(mb);
00061 } else {
00062 if(mb->numMessages() > 1 && !mc) {
00063
00064
00065
00066
00067
00068 mc = new MessageContext();
00069 try {
00070 mc->setParameter("to", ss->defaultContext.getParameter("to"));
00071 mc->setParameter("from", ss->defaultContext.getParameter("from"));
00072 mc->setParameter("method", ss->defaultContext.getParameter("method"));
00073 } catch(MessageContext::NoSuchParameterError x) {
00074 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00075 }
00076 }
00077 else if(mc) mc->acquire();
00078 for(int j=0; j < mb->numMessages(); j++) {
00079 vRef<Message> m = mb->getMessage(j);
00080
00081 if(m->getType() == "default") {
00082 if(mc) {
00083 if(m->hasMethod()) mc->setParameter("method", m->getMethod());
00084 if(m->hasTo()) mc->setParameter("to", m->getTo());
00085 if(m->hasFrom()) mc->setParameter("from", m->getFrom());
00086 } else {
00087 if(m->hasMethod()) ss->defaultContext.setParameter("method", m->getMethod());
00088 if(m->hasTo()) ss->defaultContext.setParameter("to", m->getTo());
00089 if(m->hasFrom()) ss->defaultContext.setParameter("from", m->getFrom());
00090 }
00091 } else if(m->getType() == "include") {
00092 try {
00093 vRef<MessageBlock> mb = ss->getMessageBlock(m->getMethod());
00094 vRef<MessageContext> newmc = new MessageContext();
00095 newmc->setParentContext(mc);
00096 try {
00097 if(m->hasTo()) newmc->setParameter("to", m->getTo());
00098 else newmc->setParameter("to", ss->defaultContext.getParameter("to"));
00099
00100 if(m->hasFrom()) newmc->setParameter("from", m->getFrom());
00101 else newmc->setParameter("from", ss->defaultContext.getParameter("from"));
00102
00103 if(m->hasMethod()) newmc->setParameter("method", m->getMethod());
00104 else newmc->setParameter("method", ss->defaultContext.getParameter("method"));
00105 } catch(MessageContext::NoSuchParameterError x) {
00106 LOG("localsite", 3, "scheduleMessageBlock Error " << x.what());
00107 }
00108 for(int i = 0; i < m->getNumFields(); i++) {
00109 const Message::Field& f = m->getField(i);
00110 newmc->setParameter(f.key, f.source);
00111 }
00112 string extradep = m->getDependency();
00113 mc->doSubstitution(extradep);
00114 scheduleMessageBlock(&mb, &newmc, ss, plustime + m->getTime(), extradep.c_str());
00115 } catch(NoSuchMessageBlockError) { }
00116 } else {
00117 vRef<Message> mcopy = (mb->getName() == "" && mc==0) ? (m->acquire(), &m) : (new Message(*m));
00118 if(mcopy != m && mcopy->getCount() != 1) LOG("refcount", 5, "count on new (copied) message is "
00119 << mcopy->getCount());
00120 if(extradependency) {
00121 if(mcopy->getDependency() == "") mcopy->setDependency(extradependency);
00122 else {
00123 string s = mcopy->getDependency();
00124 s += ",";
00125 s += extradependency;
00126 mcopy->setDependency(s);
00127 }
00128 }
00129 mcopy->setSourceSite(ss);
00130 mcopy->setMessageContext(mc);
00131 if(mcopy->getDependency() != "") {
00132 ss->trapOutgoingReply(&mcopy);
00133 }
00134 if(mc) {
00135 try {
00136 if(! mcopy->hasMethod()) mcopy->setMethod(mc->getParameter("method"));
00137 if(! mcopy->hasTo()) mcopy->setTo(mc->getParameter("to"));
00138 if(! mcopy->hasFrom()) mcopy->setFrom(mc->getParameter("from"));
00139 } catch(MessageContext::NoSuchParameterError x) {
00140 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00141 }
00142 } else {
00143 try {
00144 if(! mcopy->hasMethod()) mcopy->setMethod(ss->defaultContext.getParameter("method"));
00145 if(! mcopy->hasTo()) mcopy->setTo(ss->defaultContext.getParameter("to"));
00146 if(! mcopy->hasFrom()) mcopy->setFrom(ss->defaultContext.getParameter("from"));
00147 } catch(MessageContext::NoSuchParameterError x) {
00148 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00149 }
00150 }
00151
00152 double now = getTimer();
00153
00154
00155
00156 LOG("localsite", 4, "scheduling incoming message " << mcopy->incoming_debug);
00157 mcopy->acquire();
00158 siteMessageQueue[now + plustime + mcopy->getTime()].push_back(&mcopy);
00159 }
00160 }
00161 if(mc) mc->release();
00162 }
00163 }
00164
00165 void LocalSite::addCallback(callback_t callback, void* userdata, double timeout, bool repeat) {
00166 for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00167 if((*i).second.callback == callback) {
00168 (*i).second.userdata = userdata;
00169 (*i).second.timeout = timeout;
00170 (*i).second.repeat = repeat;
00171 return;
00172 } else i++;
00173 }
00174 scheduleCallback(callback, 0, userdata, timeout, repeat);
00175 }
00176
00177 void LocalSite::addCallback(CallbackListener* listener, double timeout, bool repeat) {
00178 for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00179 if((*i).second.listener == listener) {
00180 (*i).second.userdata = 0;
00181 (*i).second.timeout = timeout;
00182 (*i).second.repeat = repeat;
00183 return;
00184 } else i++;
00185 }
00186 scheduleCallback(0, listener, 0, timeout, repeat);
00187 }
00188
00189 void LocalSite::scheduleCallback(callback_t callback, CallbackListener* listener, void* userdata, double timeout, bool repeat) {
00190 double now = getTimer();
00191 if(! callbacklock) {
00192 callbacks[now + timeout].callback = callback;
00193 callbacks[now + timeout].listener = listener;
00194 callbacks[now + timeout].userdata = userdata;
00195 callbacks[now + timeout].timeout = timeout;
00196 callbacks[now + timeout].repeat = repeat;
00197 } else {
00198 pair<double, CallbackInfo> p;
00199 p.first = now + timeout;
00200 p.second.callback = callback;
00201 p.second.listener = listener;
00202 p.second.userdata = userdata;
00203 p.second.timeout = timeout;
00204 p.second.repeat = repeat;
00205 callbackaddqueue.push(p);
00206 }
00207 }
00208
00209 void LocalSite::removeCallback(callback_t callback)
00210 {
00211 if(! callbacklock) {
00212 for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00213 if((*i).second.callback == callback) {
00214 map<double, CallbackInfo>::iterator tmp = i;
00215 i++;
00216 callbacks.erase(tmp);
00217 } else i++;
00218 }
00219 } else {
00220 callbackremovequeue.push(callback);
00221 }
00222 }
00223
00224 void LocalSite::removeCallback(CallbackListener* listener)
00225 {
00226 if(! callbacklock) {
00227 for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00228 if((*i).second.listener == listener) {
00229 map<double, CallbackInfo>::iterator tmp = i;
00230 i++;
00231 callbacks.erase(tmp);
00232 } else i++;
00233 }
00234 } else {
00235 callback_listener_remove_queue.push(listener);
00236 }
00237 }
00238
00239 void LocalSite::doCallbacks()
00240 {
00241 if(! callbacklock) {
00242 callbacklock = true;
00243
00244 double now = getTimer();
00245
00246 LOG("localsite", 5, "Running callbacks (" << (unsigned int)callbacks.size() << ")");
00247
00248 for(map<double, CallbackInfo>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00249 if((*i).first <= now) {
00250 try {
00251 if ( (*i).second.callback )
00252 (*i).second.callback((*i).second.userdata);
00253 else if ( (*i).second.listener )
00254 (*i).second.listener->notifySiteCallback();
00255 else
00256 LOG("localsite", 1, "Warning: Callback has neither a callback function nor a listener object!");
00257
00258 } catch(exception& e) {
00259 LOG("localsite", 1, "Callback emitted exception: " << e.what());
00260 } catch(...) {
00261 LOG("localsite", 1, "Callback emitted unknown exception!");
00262 }
00263
00264 if((*i).second.repeat) {
00265 pair<double, CallbackInfo> p;
00266 p.first = now + (*i).second.timeout;
00267 p.second = (*i).second;
00268 callbackaddqueue.push(p);
00269 LOG("localsite", 5, "resetting callback to trigger in " << (*i).second.timeout << " seconds");
00270 }
00271
00272 map<double, CallbackInfo>::iterator tmp = i;
00273 i++;
00274 callbacks.erase(tmp);
00275 } else break;
00276 }
00277
00278 callbacklock = false;
00279
00280 while(! callbackaddqueue.empty())
00281 {
00282 callbacks[callbackaddqueue.front().first] = callbackaddqueue.front().second;
00283 callbackaddqueue.pop();
00284 }
00285
00286 while(! callbackremovequeue.empty())
00287 {
00288 removeCallback(callbackremovequeue.front());
00289 }
00290
00291 while(! callback_listener_remove_queue.empty())
00292 {
00293 removeCallback(callback_listener_remove_queue.front());
00294 }
00295
00296 LOG("localsite", 5, "Done with callbacks");
00297 }
00298 }
00299
00300 void LocalSite::runSchedule()
00301 {
00302 double now = getTimer();
00303
00304 LOG("localsite", 5, "Running schedule");
00305
00306 bool inc_i;
00307 for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin(); i != siteMessageQueue.end(); inc_i ? i++ : 0) {
00308 inc_i = true;
00309 if((*i).first <= now) {
00310 bool inc_q;
00311 for(list<Message*>::iterator q = (*i).second.begin(); q != (*i).second.end(); inc_q ? q++ : 0) {
00312 inc_q = true;
00313 MessageContext* mc = (*q)->getMessageContext();
00314 bool depsatisfied=true;
00315 string dep = (*q)->getDependency();
00316 if((*q)->getDependency() != "" && mc) {
00317 unsigned int i=0;
00318 unsigned int n=0;
00319 while(i < dep.size()) {
00320 while(i < dep.size() && dep[i] != ',') i++;
00321 LOG("localsite", 5, "--- 0 depsatisfied " << depsatisfied << " ---");
00322 LOG("localsite", 5, "dep.substr: " << dep.substr(n, i-n));
00323 depsatisfied &= mc->replyRecordContains(dep.substr(n, i-n));
00324 LOG("localsite", 5, "1 depsatisfied " << depsatisfied);
00325 n=++i;
00326 }
00327 }
00328 LOG("localsite", 5, "2 depsatisfied " << depsatisfied);
00329 bool removeQ = false;
00330 if(depsatisfied) {
00331 map<string, RemoteSite*>::iterator r;
00332 vRef<Site> s = (*q)->getSourceSite();
00333 if(&s && (s->getGreeted() || ((*q)->getMethod() == "core:hello" && (*q)->getType() == "message")))
00334 {
00335 URL to((*q)->getTo());
00336 if(! pendingValidationHosts.count(to.getHostAndPort())) {
00337 r = prioritized.find((*q)->getNonce());
00338 if(r != prioritized.end() && (*r).second == &s) prioritizedQueue.push_back(*q);
00339 else
00340 msgsReady.push_back(*q);
00341
00342 LOG("localsite", 4, "adding msg " << (*q)->incoming_debug << " to schedule with time");
00343
00344 }
00345
00346 removeQ = true;
00347
00348 } else {
00349 LOG("localsite", 4, "can't deliver mesage " << (*q)->getMethod() << " until source site greets us");
00350 if(!s->isConnected() && !s->getGreeted()) removeQ = true;
00351 }
00352 } else {
00353 vRef<Site> s = (*q)->getSourceSite();
00354 if(! s->isConnected()) removeQ = true;
00355 }
00356 if(removeQ) {
00357 list<Message*>::iterator old = q;
00358 q++;
00359 inc_q = false;
00360 (*i).second.erase(old);
00361 if(q == (*i).second.end()) {
00362 if(mc) mc->release();
00363 break;
00364 }
00365 }
00366 if(mc) mc->release();
00367 }
00368 if((*i).second.size() == 0) {
00369 map<double, list<Message*> >::iterator old = i;
00370 i++;
00371 inc_i = false;
00372 siteMessageQueue.erase(old);
00373 if(i == siteMessageQueue.end()) break;
00374 }
00375 } else break;
00376 }
00377
00378 LOG("localsite", 5, "Done building schedule (" << (unsigned int)msgsReady.size() << ", " << (unsigned int)prioritizedQueue.size() << ")");
00379
00380 if(msgsReady.empty() && prioritizedQueue.empty()) {
00381 LOG("localsite", 5, "Done running schedule");
00382
00383 flushNotifications();
00384 doCallbacks();
00385
00386 return;
00387 }
00388
00389 if(! prioritizedQueue.empty()) {
00390
00391 vRef<Message> m = prioritizedQueue.front();
00392 prioritizedQueue.pop_front();
00393 vRef<Site> s = m->getSourceSite();
00394 if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00395 try {
00396 sendMessage(&m);
00397 } catch(runtime_error e) {
00398 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00399 } catch(...) {
00400 LOG("localsite", 2, "sendMessage emitted unknown exception");
00401 }
00402 if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00403 Site::removeSite(*s);
00404 }
00405
00406 return;
00407 }
00408
00409 while(! msgsReady.empty()) {
00410 vRef<Message> m = msgsReady.front();
00411 msgsReady.pop_front();
00412 vRef<Site> s = m->getSourceSite();
00413 if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00414 LOG("msgrefcount", 5, "about to send message number " << m->refcount_debug << " with count " << m->getCount());
00415
00416 for(unsigned int i = 0; i < messagefilters.size(); i++) {
00417 if(! messagefilters[i]->checkMessage(&m)) {
00418 LOG("localsite", 4, "Dropping filtered message: " << m->getLoggableString());
00419 continue;
00420 }
00421 }
00422
00423 try {
00424 sendMessage(&m);
00425 } catch(exception& e) {
00426 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00427 } catch(...) {
00428 LOG("localsite", 2, "sendMessage emitted unknown exception");
00429 }
00430
00431 if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00432 Site::removeSite(*s);
00433 }
00434 }
00435
00436 runSchedule();
00437 }
00438
00439 void LocalSite::removeRemotePeer(RemoteSite* rs)
00440 {
00441 if(iteratorsUsingPeerSites == 0) {
00442 map<RemoteSite*, SiteTableEntry*>::iterator i = peerSites.find(rs);
00443 if(i != peerSites.end()) {
00444 LOG("refcount", 5, "localsite: removing remote peer, count is " << (*i).first->getCount());
00445 (*i).first->release();
00446 if((*i).second->partialMessage)
00447 (*i).second->partialMessage->release();
00448 delete (*i).second;
00449 peerSites.erase(i);
00450 }
00451 } else {
00452 boost::mutex::scoped_lock lock(peerSitesBuffer_mutex);
00453 peerSitesBuffer_remove.push_back(rs);
00454 }
00455 }
00456
00457 bool LocalSite::checkScheduleHoldsSite(Site* site)
00458 {
00459 for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00460 i != siteMessageQueue.end(); i++)
00461 {
00462 for(list<Message*>::iterator n = (*i).second.begin();
00463 n != (*i).second.end(); n++)
00464 {
00465 bool ret = false;
00466 pREF(Site*, s, (*n)->getSourceSite(),
00467 if(s == site) ret = true;
00468 );
00469 if(ret) return true;
00470 }
00471 }
00472 return false;
00473 }
00474
00475 void LocalSite::sendMessage(MessageBlock* mb)
00476 {
00477 Site::sendMessage(mb);
00478 LocalVobject::sendMessage(mb);
00479 }
00480
00481 void LocalSite::verifyCheckIDPair(RemoteSocketSite* rss, const string& spoofID)
00482 {
00483 vRef<Message> nm = new Message();
00484 nm->setType("message");
00485 nm->setTo(rss->getURL().getString());
00486 nm->setMethod("core:anti-spoof-check");
00487 nm->generateNonce();
00488 nm->insertField(-1, "peek", spoofID);
00489 rss->sendMessage(&nm);
00490 }
00491
00492 void LocalSite::sendMessage(Message* m)
00493 {
00494
00495
00496
00497
00498
00499
00500 vRef<Site> sourcesite = m->getSourceSite();
00501 LOG("LocalSite::sendMessage", 3, "Incoming message: " << m->getMethod() << " (to site " << getURL().getString()
00502 << " from " << (sourcesite==0 ? "NULL" : sourcesite->getURL().getString()) << ")");
00503 LOG("LocalSite::sendMessage", 4, "<<<(incoming)<<< sending to " << getURL().getString() << " [" << m->refcount_debug
00504 << "/" << m->incoming_debug << "/" << m->getCount() << "]\n" << m->getLoggableString().substr(0, 768));
00505
00506 Site::sendMessage(m);
00507
00508 if(m->getType() == "message") {
00509 if(m->getMethod() == "core:hello") {
00510 vRef<RemoteSocketSite> st = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00511 LOG("refcount", 5, "localsite sendMessage: got handshake, refcount on site is " << st->getCount());
00512 string checkid;
00513 bool foundFavorite = false;
00514 bool shouldGreet = true;
00515 for(int i=0; i < m->getNumFields(); i++) {
00516 const Message::Field& a=m->getField(i);
00517 if(a.key == "antispoof") {
00518 checkid = a.value;
00519 setAntiSpoofIDMapping(checkid, st->getAntiSpoofID());
00520 }
00521 if(a.key == "alias" && !hasHostAlias(a.value) && !st->checkFlag("spoof-test")) {
00522 LOG("localsite", 3, "Validating alias " << a.value);
00523 bool checkit = false;
00524 if(! getAllSites().count(a.value)) checkit = true;
00525 else {
00526 vRef<RemoteSocketSite> existingsite = meta_cast<RemoteSocketSite*>(&findSite(a.value));
00527 if(!&existingsite) throw bad_cast();
00528 if(!(*st == *existingsite)) {
00529 if(! existingsite->isConnected()) {
00530 removeSite(*existingsite);
00531 checkit = true;
00532 } else {
00533
00534 LOG("localsite", 2, "May need to bind site " << st->getURL().getString()
00535 << " to site " << a.value);
00536
00537 }
00538 }
00539 }
00540 if(checkit) {
00541 if(checkid != "") {
00542 string host;
00543 int port;
00544 unsigned int i;
00545 for(i = 0; i < a.value.size() && a.value[i] != ':'; i++);
00546 host = a.value.substr(0, i);
00547 if(i < a.value.size()) port = atoi(a.value.substr(i+1).c_str());
00548 else port = VOS_DEFAULT_PORT;
00549 try {
00550 vRef<RemoteSocketSite> rss = AsyncConnect::connect(host, port, this, true);
00551
00552 rss->removeHostAlias(a.value);
00553 st->addHostAlias(a.value);
00554
00555
00556 verifyCheckIDPair(&rss, st->getAntiSpoofID());
00557 st->suppressOutgoing(true);
00558
00559 rss->acquire();
00560 st->acquire();
00561 pendingValidations[&rss].realsite = st;
00562 pendingValidations[&rss].checkid = checkid;
00563 pendingValidations[&rss].hostalias = a.value;
00564 pendingValidations[&rss].favored = ! foundFavorite;
00565 pendingValidationHosts.insert(a.value);
00566 foundFavorite = true;
00567 shouldGreet = false;
00568 } catch(RemoteSite::SiteConnectionError) {
00569 LOG("localsite", 1, "Tried to verify '" << a.value << "' but could not connect.");
00570 vRef<Message> err = new Message();
00571 err->setType("message");
00572 err->setMethod("core:error");
00573 err->insertField(-1, "error", "Could not verify your hostname '"
00574 + a.value + "'");
00575 st->sendMessage(&err);
00576 }
00577 }
00578 } else {
00579 if(st->getURL().getHostAndPort() != a.value) {
00580 LOG("localsite", 1, "Attempted spoof! '" << st->getURL().getString() << "' claimed to be '"
00581 << a.value << "' but failed verification!");
00582 }
00583 }
00584 }
00585 }
00586 st->setGreeted(shouldGreet);
00587 LOG("refcount", 5, "localsite sendMessage: done with hello, refcount on site is " << st->getCount());
00588 } else if(m->getMethod() == "core:anti-spoof-check") {
00589 try {
00590 vRef<RemoteSite> st = dynamic_cast<RemoteSite*>(m->getSourceSite());
00591 const Message::Field& f = m->getField("peek");
00592 string spid = getAntiSpoofIDMapping(f.value);
00593 if(spid == "") {
00594 st->acquire();
00595 needSpoofIDreply.push_back(pair<string, RemoteSite*>(f.value, &st));
00596 } else {
00597 vRef<Message> reply = new Message();
00598 initReply(this, &reply, m, "core:anti-spoof-reply");
00599 reply->setFrom("");
00600 reply->setType("message");
00601 reply->insertField(-1, "poke", spid);
00602 st->sendMessage(&reply);
00603 }
00604 } catch(...) { }
00605 } else if(m->getMethod() == "core:anti-spoof-reply") {
00606 vRef<RemoteSocketSite> rss = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00607 if(pendingValidations.count(&rss)) {
00608 string hostalias = pendingValidations[&rss].hostalias;
00609 vRef<RemoteSocketSite> st = pendingValidations[&rss].realsite;
00610
00611 bool isok=false;
00612 try {
00613 if(m->getField("poke").value == pendingValidations[&rss].checkid) isok = true;
00614 } catch(Message::NoSuchFieldError) {
00615 }
00616
00617 rss->handleDisconnection();
00618 if(isok) {
00619 LOG("localsite", 3, "Adding verified host alias " << hostalias
00620 << " to " << st->getURL().getString());
00621 pendingValidationHosts.erase(hostalias);
00622 if(pendingValidations[&rss].favored) {
00623 URL u = st->getURL();
00624 u.setHostAndPort(hostalias);
00625 st->setURL(u);
00626 LOG("localsite", 3, "New preferred alias: " << st->getURL().getString());
00627 }
00628 st->setGreeted(true);
00629 st->enableOutgoing(true, st->getURL().getHostAndPort());
00630 } else {
00631 LOG("localsite", 1, "Attempted spoof? Was given " << hostalias
00632 << " by " << st->getURL().getString() << " but could not verify");
00633 vRef<Message> err = new Message();
00634 err->setType("message");
00635 err->setMethod("core:error");
00636 err->insertField(-1, "error", "Could not verify your hostname '" + hostalias
00637 + "' (I'm not going to talk to you. Sorry).");
00638 st->sendMessage(&err);
00639 st->removeHostAlias(hostalias);
00640 }
00641 pendingValidations.erase(&rss);
00642 rss->release();
00643 }
00644 return;
00645 } else if(m->getMethod() == "core:error") {
00646 try {
00647 LOG("localsite", 1, "Got error: " << m->getField("error").value);
00648 } catch(Message::NoSuchFieldError) { }
00649 } else if(m->getMethod() == "core:create-object") {
00650 vRef<Message> reply = new Message();
00651 initReply(this, &reply, m, "core:set-child-update");
00652 string newobjname;
00653 deque<string> newobjtypes;
00654 try {
00655 for(int i=0; i < m->getNumFields(); i++) {
00656 const Message::Field& f = m->getField(i);
00657 if(f.key == "type") {
00658 newobjtypes.push_back(f.value);
00659 } else if(f.key == "name") {
00660 newobjname = f.value;
00661 }
00662 }
00663 } catch(Message::NoSuchFieldError) { }
00664
00665 vRef<Site> ss = m->getSourceSite();
00666
00667 const deque<VobjectAccessControl*>& ac = getAccessControls();
00668 bool permitted = true;
00669 string errmessage;
00670 for(unsigned int i = 0; i < ac.size(); i++) {
00671 SiteAccessControl* sac = dynamic_cast<SiteAccessControl*>(ac[i]);
00672 if(!sac || !sac->checkCreateVobjectPermission(*ss, *this, newobjname, newobjtypes, errmessage)) {
00673 permitted = false;
00674 break;
00675 }
00676 }
00677 if(ac.size() == 0) permitted = false;
00678
00679 if(permitted) {
00680 vRef<LocalMetaObject> newobj =
00681 meta_cast<LocalMetaObject*>(createMetaObject(newobjname.c_str(), (char*)0));
00682
00683 for(deque<string>::iterator i = newobjtypes.begin();
00684 i != newobjtypes.end(); i++)
00685 {
00686 string message;
00687 vRef<VobjectEvent> event = new VobjectEvent(VobjectEvent::TypeInsert, *ss, *newobj, *i);
00688 if(validateAccess(*event, message)) newobj->addType(*i, *ss);
00689 }
00690 vRef<ParentChildRelation> pcr = newobj->findParent(*this);
00691 char n[16];
00692 snprintf(n, sizeof(n), "%i", pcr->position);
00693 reply->insertField(-1, "pos", n);
00694 reply->insertField(-1, "name", pcr->contextual_name);
00695 reply->insertField(-1, "path", newobj->getURL().getString());
00696 const TypeSet& ts = newobj->getTypes();
00697 if(ts.size() > 0) {
00698 for(TypeSet::const_iterator ti = ts.begin(); ti != ts.end(); ti++) {
00699 reply->insertField(-1, "type", *ti);
00700 }
00701 } else reply->insertField(-1, "type", "");
00702 LOG("refcounted", 5, "Done creating meta object, count (after release) is " << (newobj->getCount()-1));
00703 } else {
00704 reply->insertField(-1, "error", "Permission denied: " + errmessage);
00705 }
00706 try {
00707 ss->sendMessage(&reply);
00708 } catch(...) { }
00709 }
00710 }
00711
00712 if(&sourcesite && sourcesite->checkFlag("spoof-test")) return;
00713
00714 try {
00715 URL to(m->getTo());
00716 URL from(m->getFrom());
00717
00718 if(! getAllSites().count(to.getHostAndPort())) {
00719 LOG("localsite", 3, "dropping message with unknown to field " << to.getString());
00720 return;
00721 }
00722
00723 map<string, Site*>::const_iterator sti = getAllSites().find(from.getHostAndPort());
00724 vRef<Site> fromsite;
00725 if(sti != getAllSites().end()) {
00726 fromsite = (*sti).second;
00727 fromsite->acquire();
00728 }
00729 if(! &fromsite) {
00730 LOG("localsite", 3, "dropping message with unknown from field " << from.getString());
00731 return;
00732 }
00733
00734 LOG("refcount", 5, "localsite sendMessage: fromsite refcount is " << fromsite->getCount());
00735 vRef<Site> st = m->getSourceSite();
00736 if(! &st) {
00737 LOG("localsite", 3, "dropping message with no sourcesite!");
00738 return;
00739 }
00740
00741 if(&fromsite != &st && st->isRemote()) {
00742 if(!st->checkFlag("spoof-test") && m->getMethod() != "core:type-add-update") {
00743 LOG("sendMessage", 2, "Note: site in from field \"" << fromsite->getURL().getString()
00744 << "\" and actual source site \"" << st->getURL().getString()
00745 << "\" are not the same object");
00746 LOG("sendMessage", 3, "error happened on message " << m->getLoggableString());
00747 }
00748 return;
00749 }
00750 if(peerSites.count(dynamic_cast<RemoteSite*>(&fromsite))) {
00751 if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == "") {
00752 peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName = to.getHostAndPort();
00753 }
00754 if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == to.getHostAndPort()) {
00755 to.setHostAndPort(getURL().getHostAndPort());
00756 m->setTo(to.getString());
00757 }
00758 }
00759 vRef<Vobject> vob = Vobject::findObjectFromRoot(m->getTo());
00760 if(!(dynamic_cast<LocalSite*>(&vob) == this)) {
00761 try {
00762 vob->sendMessage(m);
00763 } catch(exception& e) {
00764 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00765 return;
00766 } catch(...) {
00767 LOG("localsite", 2, "sendMessage emitted unknown exception");
00768 return;
00769 }
00770 }
00771 } catch(NoSuchSiteError) {
00772 LOG("localsite", 3, "No such site on to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") "
00773 << m->getLoggableString());
00774 return;
00775 } catch(NoSuchObjectError) {
00776 LOG("localsite", 3, "No such to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") object");
00777 return;
00778 } catch(URL::BadURLError) {
00779 LOG("localsite", 3, "Bad to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") field");
00780 return;
00781 }
00782
00783 if(m->getType() == "update") {
00784 try {
00785 vRef<Vobject> target = Vobject::findObjectFromRoot(m->getFrom());
00786 LOG("refcount", 5, "about to deliver to update Message, count on msg is " << m->getCount());
00787 target->sendUpdateMessage(m);
00788 LOG("refcount", 5, "did deliver to updateMessage, count on msg is " << m->getCount());
00789 } catch(NoSuchObjectError) {
00790 return;
00791 } catch(exception& e) {
00792 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00793 return;
00794 } catch(...) {
00795 LOG("localsite", 2, "sendMessage emitted unknown exception");
00796 return;
00797 }
00798 }
00799
00800 LocalMetaObject::sendMessage(m);
00801
00802 LOG("localsite", 4, "did deliver to sendMessage, count on msg is " << m->getCount());
00803 }
00804
00805 void LocalSite::setURL(const URL& u)
00806 {
00807 url = u;
00808 }
00809
00810 void LocalSite::setTimeoutOnSelect(double sec)
00811 {
00812 delete selectwait;
00813 if(sec < 0) selectwait = 0;
00814 else {
00815 selectwait = new struct timeval();
00816 selectwait->tv_sec = (long)floor(sec);
00817 selectwait->tv_usec = (long)((sec - floor(sec))*1000000.0);
00818 }
00819 }
00820
00821 double LocalSite::getTimeoutOnSelect()
00822 {
00823 if(selectwait) return (((double)selectwait->tv_sec) + (((double)selectwait->tv_usec) / 1000000.0));
00824 else return -1;
00825 }
00826
00827 void LocalSite::addPrioritizedNonce(const string& nonce, RemoteSite* source)
00828 {
00829 source->acquire();
00830 LOG("refcount", 5, "addPrioritizedNonce for " << nonce << " acquired " << source->getURL().getString()
00831 << " count is now " << source->getCount());
00832 prioritized[nonce] = source;
00833 }
00834
00835 void LocalSite::removePrioritizedNonce(const string& nonce, RemoteSite* source)
00836 {
00837 map<string, RemoteSite*>::iterator i = prioritized.find(nonce);
00838 if(i != prioritized.end() && (*i).second == source) {
00839 prioritized.erase(i);
00840 LOG("refcount", 5, "removePrioritizedNonce released " << source->getURL().getString()
00841 << " count is now " << (source->getCount()-1));
00842 source->release();
00843 }
00844 }
00845
00846 void LocalSite::extendMetaObject(LocalMetaObject* root, const char* t) {
00847 if(localObjectExtensionTable.count(t) == 0)
00848 root->addType(t);
00849 else {
00850 pair<MI, MI> range = localObjectExtensionTable.equal_range(t);
00851 for(MI it = range.first; it != range.second; it++) {
00852 MetaObject* newext = (*it).second(root, t);
00853 root->addType(newext->getType());
00854 }
00855 }
00856 }
00857
00858
00859 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const deque<string>& typelist)
00860 {
00861 string name = uniqueName(desiredName);
00862 if(!ac) ac = getAccessControls()[0];
00863 LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00864 LocalVobject::insertChild(-1, root->getName(), root);
00865 for(deque<string>::const_iterator i = typelist.begin(); i != typelist.end() ; i++) {
00866 extendMetaObject(root, (*i).c_str());
00867 }
00868 return root;
00869 }
00870
00871 MetaObject* LocalSite::createMetaObject(const char* name, const deque<string>& typelist) {
00872 return createMetaObject(name, 0, typelist);
00873 }
00874
00875 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const char* firstType, ...)
00876 {
00877 string name = uniqueName(desiredName);
00878 if(!ac) ac = getAccessControls()[0];
00879 LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00880 LocalVobject::insertChild(-1, root->getName(), root);
00881 if(!firstType)
00882 return root;
00883 extendMetaObject(root, firstType);
00884 va_list ap;
00885 va_start(ap, firstType);
00886 for(;;) {
00887 char* p=va_arg(ap, char*);
00888 if(p == 0) break;
00889 extendMetaObject(root, p);
00890 }
00891 va_end(ap);
00892 return root;
00893 }
00894
00895
00896 MetaObject* LocalSite::createMetaObject(const char* desiredName, const char* first, ...)
00897 {
00898 string name = uniqueName(desiredName);
00899 LocalMetaObject* root = new LocalMetaObject(name, this, getAccessControls()[0]);
00900 LocalVobject::insertChild(-1, root->getName(), root);
00901 if(!first)
00902 return root;
00903 extendMetaObject(root, first);
00904 va_list ap;
00905 va_start(ap, first);
00906 for(;;) {
00907 char* t = va_arg(ap, char*);
00908 if(t == 0)
00909 break;
00910 extendMetaObject(root, t);
00911 }
00912 va_end(ap);
00913 return root;
00914 }
00915
00916
00917 void LocalSite::addLocalObjectExtension(const char* type, metaobject_extender_t newmethod)
00918 {
00919 localObjectExtensionTable.insert(multimap<string, metaobject_extender_t>::value_type(string(type), newmethod));
00920 }
00921
00922
00923 void LocalSite::removeLocalObjectExtension(const char* type, metaobject_extender_t oldmethod)
00924 {
00925 pair<MI, MI> g = localObjectExtensionTable.equal_range(type);
00926 for(MI it = g.first; it != g.second; it++) {
00927 if((*it).second == oldmethod) {
00928 localObjectExtensionTable.erase(it);
00929 break;
00930 }
00931 }
00932 }
00933
00934 void LocalSite::printExtensionTable(ostream& stream) {
00935 for(MI i = localObjectExtensionTable.begin(); i != localObjectExtensionTable.end(); i++) {
00936 stream << i->first << endl;
00937 }
00938 }
00939
00940 void LocalSite::addNotification(NotifyEvent* ev)
00941 {
00942 notifyEvents.push(ev);
00943 }
00944
00945 void LocalSite::lockNotificationFlush()
00946 {
00947 notifyflushlock = true;
00948 }
00949
00950 void LocalSite::unlockNotificationFlush()
00951 {
00952 notifyflushlock = false;
00953 flushNotifications();
00954 }
00955
00956 void LocalSite::flushNotifications()
00957 {
00958 if(! notifyflushlock) {
00959 static int recursiondepth = 0;
00960 LOG("localsite", 5, "Flushing notifications (queue size is " << (unsigned int)notifyEvents.size() << ")");
00961 while(! notifyEvents.empty()) {
00962 NotifyEvent* n = notifyEvents.front();
00963 notifyEvents.pop();
00964 try {
00965 n->notify();
00966 } catch(exception& x) {
00967 LOG("localsite", 0, "call to notify emitted exception: " << x.what());
00968 } catch(...) { }
00969 delete n;
00970 }
00971 LOG("localsite", 5, "Done flushing notifications");
00972 }
00973 }
00974
00975
00976 void LocalSite::insertChild(int position, const string& contextual_name, Vobject* child)
00977 throw (AccessControlError, RemoteError)
00978 {
00979 throw AccessControlError("Impossible to do insertChild on a site, try createObject");
00980 }
00981
00982 void LocalSite::setChild(int position, const string& contextual_name, Vobject* child)
00983 throw (AccessControlError, RemoteError)
00984 {
00985 throw AccessControlError("Impossible to do setChild on a site, try createObject");
00986 }
00987
00988 void LocalSite::removeChild(int position)
00989 throw (AccessControlError, RemoteError)
00990 {
00991 throw AccessControlError("Impossible to do removeChild on a site, try excise");
00992 }
00993
00994 void LocalSite::takeOverMessages(Site* was, Site* now)
00995 {
00996 for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00997 i != siteMessageQueue.end();
00998 i++)
00999 {
01000 for(list<Message*>::iterator n = (*i).second.begin();
01001 n != (*i).second.end();
01002 n++)
01003 {
01004 vRef<Site> s = (*n)->getSourceSite();
01005 if(&s == was) (*n)->setSourceSite(now);
01006 }
01007 }
01008 }
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018 void LocalSite::setPrimaryHostname(string s)
01019 {
01020 if(s != url.getHost()) {
01021 url.setHost(s);
01022 addHostAlias(url.getHostAndPort());
01023 for(unsigned int i = 0; i < children.size(); i++) {
01024 if(LocalVobject* v = dynamic_cast<LocalVobject*>(children[i]->child)) {
01025 URL u = v->getURL();
01026 u.setHost(s);
01027 v->setURL(u);
01028 }
01029 }
01030 }
01031 }