00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <sys/types.h>
00024 #include <fcntl.h>
00025 #include <stdio.h>
00026 #include <errno.h>
00027 #include <math.h>
00028
00029 #include <algorithm>
00030 #include <typeinfo>
00031
00032
00033 #ifdef USE_STRSTREAM
00034 #include <strstream>
00035 #else
00036 #include <sstream>
00037 #endif
00038
00039 #include "vos.hh"
00040
00041
00042
00043
00044
00045 multimap<string, metaobject_extender_t> LocalSite::localObjectExtensionTable;
00046
00047 LocalSite::LocalSite()
00048 : VobjectImplementation("", 0, true), LocalVobject("", 0, 0),
00049 LocalMetaObject("", 0, 0), MetaObject(0), selectwait(new struct timeval()),
00050 callbacklock(false), notifyflushlock(false)
00051 {
00052 setGreeted(true);
00053 }
00054
00055 LocalSite::~LocalSite()
00056 {
00057 }
00058
00059 void LocalSite::scheduleMessageBlock(MessageBlock* mb, MessageContext* mc, Site* ss,
00060 double plustime, const char* extradependency)
00061 {
00062 if(mb->getName() != "" && mc == 0) {
00063 ss->addMessageBlock(mb);
00064 } else {
00065 if(mb->numMessages() > 1 && !mc) {
00066
00067
00068
00069
00070
00071 mc = new MessageContext();
00072 try {
00073 mc->setParameter("to", ss->defaultContext.getParameter("to"));
00074 mc->setParameter("from", ss->defaultContext.getParameter("from"));
00075 mc->setParameter("method", ss->defaultContext.getParameter("method"));
00076 } catch(MessageContext::NoSuchParameterError x) {
00077 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00078 }
00079 }
00080 else if(mc) mc->acquire();
00081 for(int j=0; j < mb->numMessages(); j++) {
00082 vRef<Message> m = mb->getMessage(j);
00083
00084 if(m->getType() == "default") {
00085 if(mc) {
00086 if(m->hasMethod()) mc->setParameter("method", m->getMethod());
00087 if(m->hasTo()) mc->setParameter("to", m->getTo());
00088 if(m->hasFrom()) mc->setParameter("from", m->getFrom());
00089 } else {
00090 if(m->hasMethod()) ss->defaultContext.setParameter("method", m->getMethod());
00091 if(m->hasTo()) ss->defaultContext.setParameter("to", m->getTo());
00092 if(m->hasFrom()) ss->defaultContext.setParameter("from", m->getFrom());
00093 }
00094 } else if(m->getType() == "include") {
00095 try {
00096 vRef<MessageBlock> mb = ss->getMessageBlock(m->getMethod());
00097 vRef<MessageContext> newmc = new MessageContext();
00098 newmc->setParentContext(mc);
00099 try {
00100 if(m->hasTo()) newmc->setParameter("to", m->getTo());
00101 else newmc->setParameter("to", ss->defaultContext.getParameter("to"));
00102
00103 if(m->hasFrom()) newmc->setParameter("from", m->getFrom());
00104 else newmc->setParameter("from", ss->defaultContext.getParameter("from"));
00105
00106 if(m->hasMethod()) newmc->setParameter("method", m->getMethod());
00107 else newmc->setParameter("method", ss->defaultContext.getParameter("method"));
00108 } catch(MessageContext::NoSuchParameterError x) {
00109 LOG("localsite", 3, "scheduleMessageBlock Error " << x.what());
00110 }
00111 for(int i = 0; i < m->getNumFields(); i++) {
00112 const Message::Field& f = m->getField(i);
00113 newmc->setParameter(f.key, f.source);
00114 }
00115 string extradep = m->getDependency();
00116 mc->doSubstitution(extradep);
00117 scheduleMessageBlock(&mb, &newmc, ss, plustime + m->getTime(), extradep.c_str());
00118 } catch(NoSuchMessageBlockError) { }
00119 } else {
00120 vRef<Message> mcopy = (mb->getName() == "" && mc==0) ? (m->acquire(), &m) : (new Message(*m));
00121 if(mcopy != m && mcopy->getCount() != 1) LOG("refcount", 5, "count on new (copied) message is "
00122 << mcopy->getCount());
00123 if(extradependency) {
00124 if(mcopy->getDependency() == "") mcopy->setDependency(extradependency);
00125 else {
00126 string s = mcopy->getDependency();
00127 s += ",";
00128 s += extradependency;
00129 mcopy->setDependency(s);
00130 }
00131 }
00132 mcopy->setSourceSite(ss);
00133 mcopy->setMessageContext(mc);
00134 if(mcopy->getDependency() != "") {
00135 ss->trapOutgoingReply(&mcopy);
00136 }
00137 if(mc) {
00138 try {
00139 if(! mcopy->hasMethod()) mcopy->setMethod(mc->getParameter("method"));
00140 if(! mcopy->hasTo()) mcopy->setTo(mc->getParameter("to"));
00141 if(! mcopy->hasFrom()) mcopy->setFrom(mc->getParameter("from"));
00142 } catch(MessageContext::NoSuchParameterError x) {
00143 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00144 }
00145 } else {
00146 try {
00147 if(! mcopy->hasMethod()) mcopy->setMethod(ss->defaultContext.getParameter("method"));
00148 if(! mcopy->hasTo()) mcopy->setTo(ss->defaultContext.getParameter("to"));
00149 if(! mcopy->hasFrom()) mcopy->setFrom(ss->defaultContext.getParameter("from"));
00150 } catch(MessageContext::NoSuchParameterError x) {
00151 LOG("localsite", 3, "scheduleMessageBlock error " << x.what());
00152 }
00153 }
00154
00155 double now = getTimer();
00156
00157
00158
00159 LOG("localsite", 4, "scheduling incoming message " << mcopy->incoming_debug);
00160 mcopy->acquire();
00161 siteMessageQueue[now + plustime + mcopy->getTime()].push_back(&mcopy);
00162 }
00163 }
00164 if(mc) mc->release();
00165 }
00166 }
00167
00168 void LocalSite::addCallback(callback_t callback, void* userdata, double timeout, bool repeat)
00169 {
00170 double now = getTimer();
00171
00172 for(map<double, Callback>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00173 if((*i).second.callback == callback) {
00174 (*i).second.userdata = userdata;
00175 (*i).second.timeout = timeout;
00176 (*i).second.repeat = repeat;
00177 return;
00178 } else i++;
00179 }
00180
00181 if(! callbacklock) {
00182 callbacks[now + timeout].callback = callback;
00183 callbacks[now + timeout].userdata = userdata;
00184 callbacks[now + timeout].timeout = timeout;
00185 callbacks[now + timeout].repeat = repeat;
00186 } else {
00187 pair<double, Callback> p;
00188 p.first = now + timeout;
00189 p.second.callback = callback;
00190 p.second.userdata = userdata;
00191 p.second.timeout = timeout;
00192 p.second.repeat = repeat;
00193 callbackaddqueue.push(p);
00194 }
00195 }
00196
00197 void LocalSite::removeCallback(callback_t callback)
00198 {
00199 if(! callbacklock) {
00200 for(map<double, Callback>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00201 if((*i).second.callback == callback) {
00202 map<double, Callback>::iterator tmp = i;
00203 i++;
00204 callbacks.erase(tmp);
00205 } else i++;
00206 }
00207 } else {
00208 callbackremovequeue.push(callback);
00209 }
00210 }
00211
00212
00213 void LocalSite::doCallbacks()
00214 {
00215 if(! callbacklock) {
00216 callbacklock = true;
00217
00218 double now = getTimer();
00219
00220 LOG("localsite", 4, "Running callbacks (" << callbacks.size() << ")");
00221
00222 for(map<double, Callback>::iterator i = callbacks.begin(); i != callbacks.end(); ) {
00223 if((*i).first <= now) {
00224 try {
00225 (*i).second.callback((*i).second.userdata);
00226 } catch(exception e) {
00227 LOG("localsite", 1, "Callback emitted exception: " << e.what());
00228 } catch(...) {
00229 LOG("localsite", 1, "Callback emitted unknown exception!");
00230 }
00231
00232 if((*i).second.repeat) {
00233 pair<double, Callback> p;
00234 p.first = now + (*i).second.timeout;
00235 p.second = (*i).second;
00236 callbackaddqueue.push(p);
00237 LOG("localsite", 5, "resetting callback to trigger in " << (*i).second.timeout << " seconds");
00238 }
00239
00240 map<double, Callback>::iterator tmp = i;
00241 i++;
00242 callbacks.erase(tmp);
00243 } else break;
00244 }
00245
00246 callbacklock = false;
00247
00248 while(! callbackaddqueue.empty())
00249 {
00250 callbacks[callbackaddqueue.front().first] = callbackaddqueue.front().second;
00251 callbackaddqueue.pop();
00252 }
00253
00254 while(! callbackremovequeue.empty())
00255 {
00256 removeCallback(callbackremovequeue.front());
00257 }
00258
00259 LOG("localsite", 4, "Done with callbacks");
00260 }
00261 }
00262
00263 void LocalSite::runSchedule()
00264 {
00265 double now = getTimer();
00266
00267 LOG("localsite", 5, "Running schedule");
00268
00269 deque<Message*> prioritizedQueue;
00270 bool inc_i;
00271 for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin(); i != siteMessageQueue.end(); inc_i ? i++ : 0) {
00272 inc_i = true;
00273 if((*i).first <= now) {
00274 bool inc_q;
00275 for(list<Message*>::iterator q = (*i).second.begin(); q != (*i).second.end(); inc_q ? q++ : 0) {
00276 inc_q = true;
00277 MessageContext* mc = (*q)->getMessageContext();
00278 bool depsatisfied=true;
00279 string dep = (*q)->getDependency();
00280 if((*q)->getDependency() != "" && mc) {
00281 unsigned int i=0;
00282 unsigned int n=0;
00283 while(i < dep.size()) {
00284 while(i < dep.size() && dep[i] != ',') i++;
00285 LOG("localsite", 5, "--- 0 depsatisfied " << depsatisfied << " ---");
00286 LOG("localsite", 5, "dep.substr: " << dep.substr(n, i-n));
00287 depsatisfied &= mc->replyRecordContains(dep.substr(n, i-n));
00288 LOG("localsite", 5, "1 depsatisfied " << depsatisfied);
00289 n=++i;
00290 }
00291 }
00292 LOG("localsite", 5, "2 depsatisfied " << depsatisfied);
00293 bool removeQ = false;
00294 if(depsatisfied) {
00295 map<string, RemoteSite*>::iterator r;
00296 vRef<Site> s = (*q)->getSourceSite();
00297 if(&s && (s->getGreeted() || ((*q)->getMethod() == "core:hello" && (*q)->getType() == "message")))
00298 {
00299 URL to((*q)->getTo());
00300 if(! pendingValidationHosts.count(to.getHostAndPort())) {
00301 r = prioritized.find((*q)->getNonce());
00302 if(r != prioritized.end() && (*r).second == &s) prioritizedQueue.push_back(*q);
00303 else msgsReady.push_back(*q);
00304
00305 LOG("localsite", 4, "adding msg " << (*q)->incoming_debug << " to schedule with time");
00306
00307 }
00308
00309 removeQ = true;
00310
00311 } else {
00312 LOG("localsite", 4, "can't deliver mesage " << (*q)->getMethod() << " until source site greets us");
00313 if(!s->isConnected() && !s->getGreeted()) removeQ = true;
00314 }
00315 } else {
00316 vRef<Site> s = (*q)->getSourceSite();
00317 if(! s->isConnected()) removeQ = true;
00318 }
00319 if(removeQ) {
00320 list<Message*>::iterator old = q;
00321 q++;
00322 inc_q = false;
00323 (*i).second.erase(old);
00324 if(q == (*i).second.end()) {
00325 if(mc) mc->release();
00326 break;
00327 }
00328 }
00329 if(mc) mc->release();
00330 }
00331 if((*i).second.size() == 0) {
00332 map<double, list<Message*> >::iterator old = i;
00333 i++;
00334 inc_i = false;
00335 siteMessageQueue.erase(old);
00336 if(i == siteMessageQueue.end()) break;
00337 }
00338 } else break;
00339 }
00340
00341 LOG("localsite", 4, "Done building schedule (" << msgsReady.size() << ", " << prioritizedQueue.size() << ")");
00342
00343 if(msgsReady.empty() && prioritizedQueue.empty()) {
00344 LOG("localsite", 5, "Done running schedule");
00345
00346 flushNotifications();
00347 doCallbacks();
00348
00349 return;
00350 }
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 while(! msgsReady.empty()) {
00372 vRef<Message> m = msgsReady.front();
00373 msgsReady.pop_front();
00374 vRef<Site> s = m->getSourceSite();
00375 if(&s) LOG("refcount", 5, "scheduleMessageBlock refcount on sourcesite is " << s->getCount());
00376 LOG("msgrefcount", 5, "about to send message number " << m->refcount_debug << " with count " << m->getCount());
00377
00378 for(int i = 0; i < messagefilters.size(); i++) {
00379 if(! messagefilters[i]->checkMessage(&m)) {
00380 LOG("localsite", 4, "Dropping filtered message: " << m->getLoggableString());
00381 return;
00382 }
00383 }
00384
00385 try {
00386 sendMessage(&m);
00387 } catch(runtime_error e) {
00388 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00389 } catch(...) {
00390 LOG("localsite", 2, "sendMessage emitted unknown exception");
00391 }
00392
00393 if(!s->isConnected() && !checkScheduleHoldsSite(&s)) {
00394 Site::removeSite(*s);
00395 }
00396 }
00397
00398 runSchedule();
00399 }
00400
00401 void LocalSite::removeRemotePeer(RemoteSite* rs)
00402 {
00403 if(iteratorsUsingPeerSites == 0) {
00404 map<RemoteSite*, SiteTableEntry*>::iterator i = peerSites.find(rs);
00405 if(i != peerSites.end()) {
00406 LOG("refcount", 5, "localsite: removing remote peer, count is " << (*i).first->getCount());
00407 (*i).first->release();
00408 if((*i).second->partialMessage)
00409 (*i).second->partialMessage->release();
00410 delete (*i).second;
00411 peerSites.erase(i);
00412 }
00413 } else {
00414 boost::mutex::scoped_lock lock(peerSitesBuffer_mutex);
00415 peerSitesBuffer_remove.push_back(rs);
00416 }
00417 }
00418
00419 bool LocalSite::checkScheduleHoldsSite(Site* site)
00420 {
00421 for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00422 i != siteMessageQueue.end(); i++)
00423 {
00424 for(list<Message*>::iterator n = (*i).second.begin();
00425 n != (*i).second.end(); n++)
00426 {
00427 bool ret = false;
00428 pREF(Site*, s, (*n)->getSourceSite(),
00429 if(s == site) ret = true;
00430 );
00431 if(ret) return true;
00432 }
00433 }
00434 return false;
00435 }
00436
00437 void LocalSite::sendMessage(MessageBlock* mb)
00438 {
00439 Site::sendMessage(mb);
00440 LocalVobject::sendMessage(mb);
00441 }
00442
00443 void LocalSite::verifyCheckIDPair(RemoteSocketSite* rss, const string& spoofID)
00444 {
00445 vRef<Message> nm = new Message();
00446 nm->setType("message");
00447 nm->setTo(rss->getURL().getString());
00448 nm->setMethod("core:anti-spoof-check");
00449 nm->generateNonce();
00450 nm->insertField(-1, "peek", spoofID);
00451 rss->sendMessage(&nm);
00452 }
00453
00454 void LocalSite::sendMessage(Message* m)
00455 {
00456
00457
00458
00459
00460
00461
00462 vRef<Site> sourcesite = m->getSourceSite();
00463 LOG("LocalSite::sendMessage", 3, "Incoming message: " << m->getMethod() << " (to site " << getURL().getString()
00464 << " from " << (sourcesite==0 ? "NULL" : sourcesite->getURL().getString()) << ")");
00465 LOG("LocalSite::sendMessage", 4, "<<<(incoming)<<< sending to " << getURL().getString() << " [" << m->refcount_debug
00466 << "/" << m->incoming_debug << "/" << m->getCount() << "]\n" << m->getLoggableString());
00467
00468 Site::sendMessage(m);
00469
00470 if(m->getType() == "message") {
00471 if(m->getMethod() == "core:hello") {
00472 vRef<RemoteSocketSite> st = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00473 LOG("refcount", 5, "localsite sendMessage: got handshake, refcount on site is " << st->getCount());
00474 string checkid;
00475 bool foundFavorite = false;
00476 bool shouldGreet = true;
00477 for(int i=0; i < m->getNumFields(); i++) {
00478 const Message::Field& a=m->getField(i);
00479 if(a.key == "antispoof") {
00480 checkid = a.value;
00481 setAntiSpoofIDMapping(checkid, st->getAntiSpoofID());
00482 }
00483 if(a.key == "alias" && !hasHostAlias(a.value) && !st->checkFlag("spoof-test")) {
00484 LOG("localsite", 3, "Validating alias " << a.value);
00485 bool checkit = false;
00486 if(! getAllSites().count(a.value)) checkit = true;
00487 else {
00488 vRef<RemoteSocketSite> existingsite = meta_cast<RemoteSocketSite&>(findSite(a.value));
00489 if(!(*st == *existingsite)) {
00490 if(! existingsite->isConnected()) {
00491 removeSite(*existingsite);
00492 checkit = true;
00493 }
00494 LOG("localsite", 2, "We have a multiconnection. Verifying...");
00495 LOG("localsite", 2, "May need to bind site " << st->getURL().getString()
00496 << " to site " << a.value);
00497
00498 }
00499 }
00500 if(checkit) {
00501 bool isok = false;
00502 if(checkid != "") {
00503 string host;
00504 int port;
00505 unsigned int i;
00506 for(i = 0; i < a.value.size() && a.value[i] != ':'; i++);
00507 host = a.value.substr(0, i);
00508 if(i < a.value.size()) port = atoi(a.value.substr(i+1).c_str());
00509 else port = VOS_DEFAULT_PORT;
00510 try {
00511 vRef<RemoteSocketSite> rss = AsyncConnect::connect(host, port, this, true);
00512
00513 rss->removeHostAlias(a.value);
00514 st->addHostAlias(a.value);
00515
00516
00517 verifyCheckIDPair(&rss, st->getAntiSpoofID());
00518 st->suppressOutgoing(true);
00519
00520 rss->acquire();
00521 st->acquire();
00522 pendingValidations[&rss].realsite = st;
00523 pendingValidations[&rss].checkid = checkid;
00524 pendingValidations[&rss].hostalias = a.value;
00525 pendingValidations[&rss].favored = ! foundFavorite;
00526 pendingValidationHosts.insert(a.value);
00527 foundFavorite = true;
00528 shouldGreet = false;
00529 } catch(RemoteSite::SiteConnectionError) {
00530 LOG("localsite", 1, "Tried to verify '" << a.value << "' but could not connect.");
00531 vRef<Message> err = new Message();
00532 err->setType("message");
00533 err->setMethod("core:error");
00534 err->insertField(-1, "error", "Could not verify your hostname '"
00535 + a.value + "'");
00536 st->sendMessage(&err);
00537 }
00538 }
00539 } else {
00540 if(st->getURL().getHostAndPort() != a.value) {
00541 LOG("localsite", 1, "Attempted spoof! '" << st->getURL().getString() << "' claimed to be '"
00542 << a.value << "' but failed verification!");
00543 }
00544 }
00545 }
00546 }
00547 st->setGreeted(shouldGreet);
00548 LOG("refcount", 5, "localsite sendMessage: done with hello, refcount on site is " << st->getCount());
00549 } else if(m->getMethod() == "core:anti-spoof-check") {
00550 try {
00551 vRef<RemoteSite> st = dynamic_cast<RemoteSite*>(m->getSourceSite());
00552 const Message::Field& f = m->getField("peek");
00553 string spid = getAntiSpoofIDMapping(f.value);
00554 if(spid == "") {
00555 st->acquire();
00556 needSpoofIDreply.push_back(pair<string, RemoteSite*>(f.value, &st));
00557 } else {
00558 vRef<Message> reply = new Message();
00559 initReply(this, &reply, m, "core:anti-spoof-reply");
00560 reply->setFrom("");
00561 reply->setType("message");
00562 reply->insertField(-1, "poke", spid);
00563 st->sendMessage(&reply);
00564 }
00565 } catch(...) { }
00566 } else if(m->getMethod() == "core:anti-spoof-reply") {
00567 vRef<RemoteSocketSite> rss = dynamic_cast<RemoteSocketSite*>(m->getSourceSite());
00568 if(pendingValidations.count(&rss)) {
00569 string hostalias = pendingValidations[&rss].hostalias;
00570 vRef<RemoteSocketSite> st = pendingValidations[&rss].realsite;
00571
00572 bool isok=false;
00573 try {
00574 if(m->getField("poke").value == pendingValidations[&rss].checkid) isok = true;
00575 } catch(Message::NoSuchFieldError) {
00576 }
00577
00578 rss->handleDisconnection();
00579 if(isok) {
00580 LOG("localsite", 3, "Adding verified host alias " << hostalias
00581 << " to " << st->getURL().getString());
00582 pendingValidationHosts.erase(hostalias);
00583 if(pendingValidations[&rss].favored) {
00584 URL u = st->getURL();
00585 u.setHostAndPort(hostalias);
00586 st->setURL(u);
00587 LOG("localsite", 3, "New preferred alias: " << st->getURL().getString());
00588 }
00589 st->setGreeted(true);
00590 st->enableOutgoing(true, st->getURL().getHostAndPort());
00591 } else {
00592 LOG("localsite", 1, "Attempted spoof? Was given " << hostalias
00593 << " by " << st->getURL().getString() << " but could not verify");
00594 vRef<Message> err = new Message();
00595 err->setType("message");
00596 err->setMethod("core:error");
00597 err->insertField(-1, "error", "Could not verify your hostname '" + hostalias
00598 + "' (I'm not going to talk to you. Sorry).");
00599 st->sendMessage(&err);
00600 st->removeHostAlias(hostalias);
00601 }
00602 pendingValidations.erase(&rss);
00603 rss->release();
00604 }
00605 return;
00606 } else if(m->getMethod() == "core:error") {
00607 try {
00608 LOG("localsite", 1, "Got error: " << m->getField("error").value);
00609 } catch(Message::NoSuchFieldError) { }
00610 } else if(m->getMethod() == "core:create-object") {
00611 vRef<Message> reply = new Message();
00612 initReply(this, &reply, m, "core:set-child-update");
00613 string newobjname;
00614 deque<string> newobjtypes;
00615 try {
00616 for(int i=0; i < m->getNumFields(); i++) {
00617 const Message::Field& f = m->getField(i);
00618 if(f.key == "type") {
00619 newobjtypes.push_back(f.value);
00620 } else if(f.key == "name") {
00621 newobjname = f.value;
00622 }
00623 }
00624 } catch(Message::NoSuchFieldError) { }
00625 SiteAccessControl* sac;
00626 vRef<Site> ss = m->getSourceSite();
00627 if((sac = dynamic_cast<SiteAccessControl*>(getAccessControl()))
00628 && sac->checkCreateVobjectPermission(*ss, *this, newobjname, newobjtypes)) {
00629 vRef<LocalMetaObject> newobj =
00630 meta_cast<LocalMetaObject*>(createMetaObject(newobjname.c_str(), (char*)0));
00631
00632 for(deque<string>::iterator i = newobjtypes.begin();
00633 i != newobjtypes.end(); i++)
00634 {
00635 if(getAccessControl()->checkAddTypePermission(*ss, *i))
00636 newobj->addType(*i, &ss);
00637 }
00638 vRef<ParentChildRelation> pcr = newobj->findParent(*this);
00639 char n[16];
00640 snprintf(n, sizeof(n), "%i", pcr->position);
00641 reply->insertField(-1, "pos", n);
00642 reply->insertField(-1, "name", pcr->contextual_name);
00643 reply->insertField(-1, "path", newobj->getURL().getString());
00644 const TypeSet& ts = newobj->getTypes();
00645 if(ts.size() > 0) {
00646 for(TypeSet::const_iterator ti = ts.begin(); ti != ts.end(); ti++) {
00647 reply->insertField(-1, "type", *ti);
00648 }
00649 } else reply->insertField(-1, "type", "");
00650 try {
00651 ss->sendMessage(&reply);
00652 } catch(...) { }
00653
00654 LOG("refcounted", 5, "Done creating meta object, count (after release) is " << (newobj->getCount()-1));
00655 } else {
00656 if(!sac) LOG("accesscontrol", 2, "Warning! " << getURL().getString()
00657 << " has no access control policy! Failing by default.");
00658 reply->insertField(-1, "error", "Permission denied");
00659 }
00660 }
00661 }
00662
00663 if(&sourcesite && sourcesite->checkFlag("spoof-test")) return;
00664
00665 try {
00666 URL to(m->getTo());
00667 URL from(m->getFrom());
00668
00669 if(! getAllSites().count(to.getHostAndPort())) {
00670 LOG("localsite", 3, "dropping message with unknown to field " << to.getString());
00671 return;
00672 }
00673
00674 map<string, Site*>::const_iterator sti = getAllSites().find(from.getHostAndPort());
00675 vRef<Site> fromsite;
00676 if(sti != getAllSites().end()) {
00677 fromsite = (*sti).second;
00678 fromsite->acquire();
00679 }
00680 if(! &fromsite) {
00681 LOG("localsite", 3, "dropping message with unknown from field " << from.getString());
00682 return;
00683 }
00684
00685 LOG("refcount", 5, "localsite sendMessage: fromsite refcount is " << fromsite->getCount());
00686 vRef<Site> st = m->getSourceSite();
00687 if(! &st) {
00688 LOG("localsite", 3, "dropping message with no sourcesite!");
00689 return;
00690 }
00691
00692 if(&fromsite != &st && st->isRemote()) {
00693 if(!st->checkFlag("spoof-test") && m->getMethod() != "core:type-add-update") {
00694 LOG("sendMessage", 2, "Note: site in from field \"" << fromsite->getURL().getString()
00695 << "\" and actual source site \"" << st->getURL().getString()
00696 << "\" are not the same object");
00697 LOG("sendMessage", 3, "error happened on message " << m->getLoggableString());
00698 }
00699 return;
00700 }
00701 if(peerSites.count(dynamic_cast<RemoteSite*>(&fromsite))) {
00702 if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == "") {
00703 peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName = to.getHostAndPort();
00704 }
00705 if(peerSites[dynamic_cast<RemoteSite*>(&fromsite)]->localPeerName == to.getHostAndPort()) {
00706 to.setHostAndPort(getURL().getHostAndPort());
00707 m->setTo(to.getString());
00708 }
00709 }
00710 vRef<Vobject> vob = Vobject::findObjectFromRoot(m->getTo());
00711 if(!(dynamic_cast<LocalSite*>(&vob) == this)) {
00712 try {
00713 vob->sendMessage(m);
00714 } catch(runtime_error e) {
00715 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00716 return;
00717 } catch(...) {
00718 LOG("localsite", 2, "sendMessage emitted unknown exception");
00719 return;
00720 }
00721 }
00722 } catch(NoSuchSiteError) {
00723 LOG("localsite", 3, "No such site on to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") "
00724 << m->getLoggableString());
00725 return;
00726 } catch(NoSuchObjectError) {
00727 LOG("localsite", 3, "No such to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") object");
00728 return;
00729 } catch(URL::BadURLError) {
00730 LOG("localsite", 3, "Bad to (\"" << m->getTo() << "\") or from (\"" << m->getFrom() << "\") field");
00731 return;
00732 }
00733
00734 if(m->getType() == "update") {
00735 try {
00736 vRef<Vobject> target = Vobject::findObjectFromRoot(m->getFrom());
00737 LOG("refcount", 5, "about to deliver to update Message, count on msg is " << m->getCount());
00738 target->sendUpdateMessage(m);
00739 LOG("refcount", 5, "did deliver to updateMessage, count on msg is " << m->getCount());
00740 } catch(NoSuchObjectError) {
00741 return;
00742 } catch(runtime_error e) {
00743 LOG("localsite", 2, "sendMessage emitted exception: " << e.what());
00744 return;
00745 } catch(...) {
00746 LOG("localsite", 2, "sendMessage emitted unknown exception");
00747 return;
00748 }
00749 }
00750
00751 LocalMetaObject::sendMessage(m);
00752
00753 LOG("localsite", 4, "did deliver to sendMessage, count on msg is " << m->getCount());
00754 }
00755
00756 void LocalSite::setURL(const URL& u)
00757 {
00758 url = u;
00759 }
00760
00761 void LocalSite::setTimeoutOnSelect(double sec)
00762 {
00763 delete selectwait;
00764 if(sec < 0) selectwait = 0;
00765 else {
00766 selectwait = new struct timeval();
00767 selectwait->tv_sec = (long)floor(sec);
00768 selectwait->tv_usec = (long)((sec - floor(sec))*1000000.0);
00769 }
00770 }
00771
00772 double LocalSite::getTimeoutOnSelect()
00773 {
00774 if(selectwait) return (((double)selectwait->tv_sec) + (((double)selectwait->tv_usec) / 1000000.0));
00775 else return -1;
00776 }
00777
00778 void LocalSite::addPrioritizedNonce(const string& nonce, RemoteSite* source)
00779 {
00780 source->acquire();
00781 LOG("refcount", 5, "addPrioritizedNonce for " << nonce << " acquired " << source->getURL().getString()
00782 << " count is now " << source->getCount());
00783 prioritized[nonce] = source;
00784 }
00785
00786 void LocalSite::removePrioritizedNonce(const string& nonce, RemoteSite* source)
00787 {
00788 map<string, RemoteSite*>::iterator i = prioritized.find(nonce);
00789 if(i != prioritized.end() && (*i).second == source) {
00790 prioritized.erase(i);
00791 LOG("refcount", 5, "removePrioritizedNonce released " << source->getURL().getString()
00792 << " count is now " << (source->getCount()-1));
00793 source->release();
00794 }
00795 }
00796
00797
00798 void LocalSite::extendMetaObject(LocalMetaObject* root, const char* t) {
00799 if(localObjectExtensionTable.count(t) == 0) {
00800 root->addType(t);
00801 } else {
00802 pair<MI, MI> range = localObjectExtensionTable.equal_range(t);
00803 for(MI it = range.first; it != range.second; it++) {
00804 MetaObject* newext = (*it).second(root, t);
00805 root->addType(newext->getType());
00806 }
00807 }
00808 }
00809
00810
00811 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const deque<string>& typelist)
00812 {
00813 string name = uniqueName(desiredName);
00814 if(!ac) ac = getAccessControl();
00815 LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00816 LocalVobject::insertChild(-1, root->getName(), root);
00817 for(deque<string>::const_iterator i = typelist.begin(); i != typelist.end() ; i++) {
00818 extendMetaObject(root, (*i).c_str());
00819 }
00820 return root;
00821 }
00822
00823 MetaObject* LocalSite::createMetaObject(const char* name, const deque<string>& typelist) {
00824 return createMetaObject(name, 0, typelist);
00825 }
00826
00827 MetaObject* LocalSite::createMetaObject(const char* desiredName, VobjectAccessControl* ac, const char* firstType, ...)
00828 {
00829 string name = uniqueName(desiredName);
00830 if(!ac)
00831 ac = getAccessControl();
00832 LocalMetaObject* root = new LocalMetaObject(name, this, ac);
00833 LocalVobject::insertChild(-1, root->getName(), root);
00834 if(!firstType)
00835 return root;
00836 extendMetaObject(root, firstType);
00837 va_list ap;
00838 va_start(ap, firstType);
00839 for(;;) {
00840 char* p=va_arg(ap, char*);
00841 if(p == 0) break;
00842 extendMetaObject(root, p);
00843 }
00844 va_end(ap);
00845 return root;
00846 }
00847
00848
00849 MetaObject* LocalSite::createMetaObject(const char* desiredName, const char* first, ...)
00850 {
00851 string name = uniqueName(desiredName);
00852 LocalMetaObject* root = new LocalMetaObject(name, this, getAccessControl());
00853 LocalVobject::insertChild(-1, root->getName(), root);
00854 if(!first)
00855 return root;
00856 extendMetaObject(root, first);
00857 va_list ap;
00858 va_start(ap, first);
00859 for(;;) {
00860 char* t = va_arg(ap, char*);
00861 if(t == 0)
00862 break;
00863 extendMetaObject(root, t);
00864 }
00865 va_end(ap);
00866 return root;
00867 }
00868
00869
00870 void LocalSite::addLocalObjectExtension(const char* type, metaobject_extender_t newmethod)
00871 {
00872 localObjectExtensionTable.insert(multimap<string, metaobject_extender_t>::value_type(string(type), newmethod));
00873 }
00874
00875 bool LocalSite::hasLocalObjectExtension(const char* type)
00876 {
00877 return (localObjectExtensionTable.count(type) > 0);
00878 }
00879
00880 void LocalSite::removeLocalObjectExtension(const char* type, metaobject_extender_t oldmethod)
00881 {
00882 pair<MI, MI> g = localObjectExtensionTable.equal_range(type);
00883 for(MI it = g.first; it != g.second; it++) {
00884 if((*it).second == oldmethod) {
00885 localObjectExtensionTable.erase(it);
00886 break;
00887 }
00888 }
00889 }
00890
00891 void LocalSite::printExtensionTable(ostream& stream) {
00892 for(MI i = localObjectExtensionTable.begin(); i != localObjectExtensionTable.end(); i++) {
00893 stream << i->first << endl;
00894 }
00895 }
00896
00897 void LocalSite::addNotification(NotifyEvent* ev)
00898 {
00899 notifyEvents.push(ev);
00900 }
00901
00902 void LocalSite::lockNotificationFlush()
00903 {
00904 notifyflushlock = true;
00905 }
00906
00907 void LocalSite::unlockNotificationFlush()
00908 {
00909 notifyflushlock = false;
00910 flushNotifications();
00911 }
00912
00913 void LocalSite::flushNotifications()
00914 {
00915 if(! notifyflushlock) {
00916 LOG("localsite", 5, "Flushing notifications (queue size is " << notifyEvents.size() << ")");
00917 while(! notifyEvents.empty()) {
00918 NotifyEvent* n = notifyEvents.front();
00919 notifyEvents.pop();
00920 try {
00921 n->notify();
00922 } catch(runtime_error x) {
00923 LOG("localsite", 0, "call to notify emitted exception: " << x.what());
00924 } catch(...) { }
00925 delete n;
00926 }
00927 LOG("localsite", 5, "Done flushing notifications");
00928 }
00929 }
00930
00931
00932 void LocalSite::insertChild(int position, const string& contextual_name, Vobject* child)
00933 throw (AccessControlError, RemoteError)
00934 {
00935 throw AccessControlError("Impossible to do insertChild on a site, try createObject");
00936 }
00937
00938 void LocalSite::setChild(int position, const string& contextual_name, Vobject* child)
00939 throw (AccessControlError, RemoteError)
00940 {
00941 throw AccessControlError("Impossible to do setChild on a site, try createObject");
00942 }
00943
00944 void LocalSite::removeChild(int position)
00945 throw (AccessControlError, RemoteError)
00946 {
00947 throw AccessControlError("Impossible to do removeChild on a site, try excise");
00948 }
00949
00950 void LocalSite::takeOverMessages(Site* was, Site* now)
00951 {
00952 for(map<double, list<Message*> >::iterator i = siteMessageQueue.begin();
00953 i != siteMessageQueue.end();
00954 i++)
00955 {
00956 for(list<Message*>::iterator n = (*i).second.begin();
00957 n != (*i).second.end();
00958 n++)
00959 {
00960 vRef<Site> s = (*n)->getSourceSite();
00961 if(&s == was) (*n)->setSourceSite(now);
00962 }
00963 }
00964 }
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974 void LocalSite::setPrimaryHostname(string s)
00975 {
00976 if(s != url.getHost()) {
00977 url.setHost(s);
00978 addHostAlias(url.getHostAndPort());
00979 for(unsigned int i = 0; i < children.size(); i++) {
00980 if(LocalVobject* v = dynamic_cast<LocalVobject*>(children[i]->child)) {
00981 URL u = v->getURL();
00982 u.setHost(s);
00983 v->setURL(u);
00984 }
00985 }
00986 }
00987 }