00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "config.h"
00026
00027 #include <stdexcept>
00028 #include <set>
00029 #include <string>
00030 #include <iostream>
00031 #include <map>
00032 #include <deque>
00033 using namespace std;
00034
00035 #include <sys/types.h>
00036 #include <sys/time.h>
00037 #include <unistd.h>
00038 #include <sys/socket.h>
00039 #include <sys/select.h>
00040 #include <netinet/in.h>
00041 #include <netinet/tcp.h>
00042 #include <netdb.h>
00043 #include <stdio.h>
00044 #include <errno.h>
00045 #include <arpa/inet.h>
00046 #include <math.h>
00047
00048 #if defined(HAVE_BOOST_REGEX_H) && defined(HAVE_LIBBOOST)
00049 # include <boost/regex.h>
00050 #else
00051 extern "C" {
00052 # include <regex.h>
00053 }
00054 #endif
00055
00056
00057 #if !defined(HAVE_SOCKLEN_T) && !defined(socklen_t)
00058 typedef int socklen_t;
00059 #endif
00060
00061
00062 #include "servicedirectory.hh"
00063 using namespace LSD;
00064
00065 #define WARNING(s) { cerr << "libservicediscovery: Warning: " << s << endl; }
00066
00067 #ifdef DEBUG_MODE
00068 # define DEBUG(s) { cerr << "libservicediscovery: " << s << endl; }
00069 #else
00070 # define DEBUG(s)
00071 #endif
00072
00073 extern "C" {
00074 #include "packet.h"
00075 }
00076
00077 void respondToMessage(
00078 set<Service*>& services,
00079 map<int, ServiceDirectory::Query>& queries,
00080 pack_ubyte* buf, unsigned int buf_len,
00081 int fd, sockaddr_in* skaddr, socklen_t sa_len
00082 );
00083
00084 class ServiceMatch {
00085 private:
00086 regex_t typereg;
00087 regex_t titlereg;
00088 regex_t descreg;
00089 regex_t urlreg;
00090 bool typeisdotstar;
00091 public:
00092 ServiceMatch(const std::string& typepattern,
00093 const std::string& titlepattern,
00094 const std::string& descpattern,
00095 const std::string& urlpattern);
00096 ~ServiceMatch();
00097 bool match(const Service& s);
00098 };
00099
00100 ServiceMatch::ServiceMatch(const string& typepattern,
00101 const string& titlepattern,
00102 const string& descpattern,
00103 const string& urlpattern)
00104 {
00105 if(typepattern == "") {
00106 regcomp(&typereg, ".*", REG_EXTENDED | REG_NOSUB);
00107 typeisdotstar = true;
00108 }
00109 else {
00110 typeisdotstar = false;
00111 regcomp(&typereg, typepattern.c_str(), REG_EXTENDED | REG_NOSUB);
00112 }
00113
00114 if(titlepattern == "") regcomp(&titlereg, ".*", REG_EXTENDED | REG_NOSUB);
00115 else regcomp(&titlereg, titlepattern.c_str(), REG_EXTENDED | REG_NOSUB);
00116
00117 if(descpattern == "") regcomp(&descreg, ".*", REG_EXTENDED | REG_NOSUB);
00118 else regcomp(&descreg, descpattern.c_str(), REG_EXTENDED | REG_NOSUB);
00119
00120 if(urlpattern == "") regcomp(&urlreg, ".*", REG_EXTENDED | REG_NOSUB);
00121 else regcomp(&urlreg, urlpattern.c_str(), REG_EXTENDED | REG_NOSUB);
00122 }
00123
00124 ServiceMatch::~ServiceMatch()
00125 {
00126 regfree(&typereg);
00127 regfree(&titlereg);
00128 regfree(&descreg);
00129 regfree(&urlreg);
00130 }
00131
00132 bool ServiceMatch::match(const Service& sv)
00133 {
00134 bool matchtype = typeisdotstar;
00135 for(set<string>::iterator ti = sv.types.begin(); ti != sv.types.end(); ti++) {
00136 if(regexec(&typereg, (*ti).c_str(), 0, 0, 0) == 0) {
00137 matchtype=true;
00138 break;
00139 }
00140 }
00141 return (matchtype
00142 && (sv.title.size() == 0 || regexec(&titlereg, sv.title.c_str(), 0, 0, 0) == 0)
00143 && (sv.description.size() == 0 || regexec(&descreg, sv.description.c_str(), 0, 0, 0) == 0)
00144 && (sv.url.size() == 0 || regexec(&urlreg, sv.url.c_str(), 0, 0, 0) == 0));
00145 }
00146
00147
00148
00149 short ServiceDirectory::LISTEN_BCAST = 0x1;
00150 short ServiceDirectory::LISTEN_TCP = 0x2;
00151 short ServiceDirectory::RELAY_TCP_TO_BCAST = 0x4;
00152 short ServiceDirectory::RELAY_BCAST_TO_TCP = 0x8;
00153 short ServiceDirectory::RELAY_TCP_TO_REMOTE = 0x16;
00154
00155
00156 ServiceDirectory::ServiceDirectory(short opt) {
00157 DEBUG("new service directory (opt=" << opt << ")");
00158 udp_port = 4230;
00159 if(opt & LISTEN_BCAST)
00160 listenUDP();
00161 else
00162 udp_fd = -1;
00163 if(opt & LISTEN_TCP)
00164 listenTCP();
00165 else
00166 tcp_fd = -1;
00167 relay_tcp_local = (opt & RELAY_TCP_TO_BCAST);
00168 relay_udp_remote = (opt & RELAY_BCAST_TO_TCP);
00169 relay_tcp_remote = (opt & RELAY_TCP_TO_REMOTE);
00170 }
00171
00172 ServiceDirectory::ServiceDirectory() {
00173 DEBUG("new service directory (default options)");
00174 udp_port = 4230;
00175 listenUDP();
00176 tcp_fd = -1;
00177 relay_tcp_local = true;
00178 relay_udp_remote = true;
00179 relay_tcp_remote = false;
00180 }
00181
00182
00183
00184
00185 void ServiceDirectory::listenUDP(int port) {
00186
00187 if(port == -1 && udp_fd != -1) {
00188 shutdown(udp_fd, SHUT_RDWR);
00189 udp_fd = -1;
00190 return;
00191 }
00192
00193 udp_fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
00194 struct sockaddr_in skaddr;
00195 int opt_bcast, opt_report, opt_readdr;
00196 opt_bcast = opt_report = opt_readdr = 1;
00197 setsockopt(udp_fd, SOL_SOCKET, SO_BROADCAST, &opt_bcast, sizeof(int));
00198 #ifdef SO_REUSEPORT
00199 setsockopt(udp_fd, SOL_SOCKET, SO_REUSEPORT, &opt_report, sizeof(int));
00200 #else
00201 #warning This platform is missing SO_REUSEPORT. Will just SO_REUSEADDR work?
00202 #endif
00203 setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &opt_readdr, sizeof(int));
00204 skaddr.sin_family = AF_INET;
00205 skaddr.sin_port = htons(port);
00206 skaddr.sin_addr.s_addr = 0;
00207 bind(udp_fd, (struct sockaddr*)&skaddr, sizeof(struct sockaddr_in));
00208 udp_port = port;
00209 DEBUG("listening on udp port " << port);
00210 }
00211
00212 ServiceDirectory::~ServiceDirectory() {
00213 if(udp_fd != -1)
00214 shutdown(udp_fd, SHUT_RDWR);
00215 for( map<pair<char*, int>, int>::const_iterator i = tcp_relays.begin();
00216 i != tcp_relays.end();
00217 i++)
00218 {
00219 shutdown(i->second, SHUT_RDWR);
00220 tcp_relays.erase(i->first);
00221 }
00222 for(deque<int>::const_iterator i = incoming_tcp_fds.begin();
00223 i != incoming_tcp_fds.end(); i++) {
00224 shutdown(*i, SHUT_RDWR);
00225 }
00226 if(tcp_fd != -1)
00227 shutdown(tcp_fd, SHUT_RDWR);
00228 }
00229
00230 int ServiceDirectory::query(const string& typepattern,
00231 const string& titlepattern,
00232 const string& descpattern,
00233 const string& urlpattern,
00234 ServiceAdvertismentListener* cb)
00235 {
00236
00237 int r;
00238 do {
00239 r = rand();
00240 } while(queries.count(r));
00241 unsigned char pkt[2048];
00242 int pkt_len;
00243 pkt_len = pack(pkt, sizeof(pkt), "cclSSSS",
00244 0, 'Q', r,
00245 typepattern.c_str(), typepattern.size(),
00246 titlepattern.c_str(), titlepattern.size(),
00247 descpattern.c_str(), descpattern.size(),
00248 urlpattern.c_str(), urlpattern.size()
00249 );
00250
00251
00252 if(udp_fd != -1) {
00253 struct sockaddr_in skaddr;
00254 skaddr.sin_family = AF_INET;
00255 skaddr.sin_port = htons(udp_port);
00256 skaddr.sin_addr.s_addr = INADDR_BROADCAST;
00257 DEBUG("sending query message to udp bcast...");
00258 sendto(udp_fd, pkt, pkt_len, 0, (struct sockaddr*)&skaddr, sizeof(struct sockaddr_in));
00259 }
00260
00261
00262 for( map<pair<char*, int>, int>::const_iterator i = tcp_relays.begin();
00263 i != tcp_relays.end();
00264 i++)
00265 {
00266 int fd = i->second;
00267 DEBUG("sending query to tcp socket " << fd);
00268 if( send(fd, pkt, pkt_len, 0) < 0 ) {
00269 removeRelay(i->first.first, i->first.second);
00270 }
00271 }
00272
00273
00274
00275 Query q;
00276 q.typepattern = typepattern;
00277 q.titlepattern = titlepattern;
00278 q.descpattern = descpattern;
00279 q.urlpattern = urlpattern;
00280 q.listener = cb;
00281 queries[r] = q;
00282
00283 return r;
00284 }
00285
00286 void ServiceDirectory::endQuery(int q)
00287 {
00288 queries.erase(q);
00289 }
00290
00291 void ServiceDirectory::handleIncoming(float timeout)
00292 {
00293 DEBUG("handleIncoming...");
00294 fd_set readset;
00295 FD_ZERO(&readset);
00296 int max_fd = -1;
00297 if(udp_fd != -1) {
00298 FD_SET(udp_fd, &readset);
00299 max_fd = udp_fd;
00300 }
00301 if(tcp_fd != -1) {
00302 FD_SET(tcp_fd, &readset);
00303 max_fd = (tcp_fd > max_fd)?(tcp_fd):(max_fd);
00304 }
00305 for(deque<int>::const_iterator i = incoming_tcp_fds.begin();
00306 i != incoming_tcp_fds.end(); i++) {
00307 if(*i != -1) {
00308 FD_SET(*i, &readset);
00309 max_fd = (*i > max_fd)?(*i):(max_fd);
00310 }
00311 }
00312
00313 if(max_fd == -1) {
00314 DEBUG("handleIncoming: no sockets; returning");
00315 return;
00316 }
00317
00318 struct timeval tv = {0, 0};
00319 if(timeout > 0) {
00320 tv.tv_sec = (long)floor(timeout);
00321 tv.tv_usec = (long)( (timeout - tv.tv_sec) * 1000000.0 );
00322 }
00323 select( max_fd+1, &readset, 0, 0, ( (timeout >= 0) ? (&tv) : NULL ) );
00324
00325 pack_ubyte buf[2048];
00326 struct sockaddr_in skaddr;
00327 socklen_t sa_len = sizeof(struct sockaddr_in);
00328 memset(&skaddr, 0, sa_len);
00329
00330
00331 if( (udp_fd != -1) && FD_ISSET(udp_fd, &readset)) {
00332 DEBUG("handleIncoming: udp socket has data.");
00333 int r = recvfrom(udp_fd, buf, sizeof(buf), 0, (struct sockaddr*)&skaddr, &sa_len);
00334 if(r > 0) {
00335
00336
00337 respondToMessage(services, queries, buf, r, udp_fd, &skaddr, sa_len);
00338
00339
00340 if(relay_udp_remote) {
00341 for(map<pair<char*, int>, int>::const_iterator i = tcp_relays.begin();
00342 i != tcp_relays.end();
00343 i++)
00344 {
00345 int fd = i->second;
00346 DEBUG("relaying message to tcp socket " << fd);
00347 if( send(fd, buf, r, 0) < 0 ) {
00348 removeRelay(i->first.first, i->first.second);
00349 }
00350 }
00351 }
00352 } else DEBUG("no data!");
00353 }
00354
00355
00356
00357 if( (tcp_fd != -1) && FD_ISSET(tcp_fd, &readset) ) {
00358 int fd = accept(tcp_fd, 0, 0);
00359 if(fd != -1) {
00360 DEBUG("accepted incoming socket " << fd);
00361 incoming_tcp_fds.push_back(fd);
00362 }
00363 }
00364
00365 for(deque<int>::iterator i = incoming_tcp_fds.begin(); i!= incoming_tcp_fds.end(); i++) {
00366 if( (*i != -1) && FD_ISSET(*i, &readset)) {
00367 DEBUG("handleIncoming: tcp socket " << *i << " has data.");
00368 int r = recv(*i, buf, sizeof(buf), 0);
00369 if(r > 0) {
00370
00371
00372 respondToMessage(services, queries, buf, r, *i, NULL, 0);
00373
00374
00375 if(relay_tcp_local) {
00376 skaddr.sin_family = AF_INET;
00377 skaddr.sin_port = htons(udp_port);
00378 skaddr.sin_addr.s_addr = INADDR_BROADCAST;
00379 skaddr.sin_port = htons(udp_port);
00380 DEBUG("relaying message to udp bcast...");
00381 int s = sendto(udp_fd, buf, r, 0, (struct sockaddr*)&skaddr, sizeof(struct sockaddr_in));
00382 if(s < 0)
00383 WARNING("Error sending UDP broadcast message.");
00384 }
00385
00386 if(relay_udp_remote) {
00387 for( map<pair<char*, int>, int>::const_iterator i = tcp_relays.begin();
00388 i != tcp_relays.end();
00389 i++)
00390 {
00391 int fd = i->second;
00392 DEBUG("relaying message to tcp socket " << fd);
00393 if( send(fd, buf, r, 0) < 0 ) {
00394 removeRelay(i->first.first, i->first.second);
00395 }
00396 }
00397 }
00398
00399 } else {
00400 WARNING("Error reading from incoming TCP-relay socket.");
00401 shutdown(*i, SHUT_RDWR);
00402 *i = -1;
00403
00404 }
00405 }
00406 }
00407 }
00408
00409 void respondToMessage(set<Service*>& services, map<int, ServiceDirectory::Query>& queries, pack_ubyte* buf, unsigned int buf_len, int fd, sockaddr_in* skaddr, socklen_t sa_len ) {
00410 DEBUG("respondToMessage: got message (length " << buf_len << ")");
00411
00412 pack_ubyte reply_buf[2048];
00413 char version, msgtype;
00414 int nonce;
00415 int p = unpack(buf, buf_len, "ccl", &version, &msgtype, &nonce);
00416 if(msgtype == 'Q') {
00417 char *typepattern, *titlepattern, *descpattern, *urlpattern;
00418 int tylen, tilen, dlen, ulen;
00419
00420 unpack(buf + 6, buf_len - 6, "SSSS",
00421 &typepattern, &tylen,
00422 &titlepattern, &tilen,
00423 &descpattern, &dlen,
00424 &urlpattern, &ulen);
00425
00426 DEBUG("respondToMessage: got Query. type=" << typepattern << " title=" << titlepattern << " desc=" << descpattern << " url=" << urlpattern);
00427
00428
00429 ServiceMatch sm(string(typepattern, tylen),
00430 string(titlepattern, tilen),
00431 string(descpattern, dlen),
00432 string(urlpattern, ulen));
00433 for(set<Service*>::iterator si = services.begin(); si != services.end(); si++) {
00434 if(sm.match(**si)) {
00435
00436
00437 int p = pack(reply_buf, sizeof(reply_buf), "ccls", 0, 'A', nonce, (*si)->types.size());
00438 for(set<string>::iterator ti = (*si)->types.begin(); ti != (*si)->types.end(); ti++) {
00439 p += pack(reply_buf + p, sizeof(reply_buf) - p, "S", (*ti).c_str(), (*ti).size());
00440 }
00441 p += pack(reply_buf + p, sizeof(reply_buf) - p, "SSS",
00442 (*si)->title.c_str(), (*si)->title.size(),
00443 (*si)->description.c_str(), (*si)->description.size(),
00444 (*si)->url.c_str(), (*si)->url.size());
00445 DEBUG("Built a reply to the query, with URL " << (*si)->url.c_str() << " (length " << (*si)->url.size() << ")");
00446
00447
00448
00449 if(skaddr) {
00450 DEBUG("respondToMessage: sending query-reply announcement message to " << fd << " - addr: " << inet_ntoa(skaddr->sin_addr) << " - msg(" << p << ")" );
00451 sendto(fd, reply_buf, p, 0, (struct sockaddr*)skaddr, sa_len);
00452 } else {
00453 DEBUG("respondToMessage: sending query-reply announcement to " << fd);
00454 send(fd, reply_buf, p, 0);
00455 }
00456
00457 }
00458 }
00459
00460 free(typepattern);
00461 free(titlepattern);
00462 free(descpattern);
00463 free(urlpattern);
00464
00465 } else if(msgtype == 'A') {
00466 DEBUG("respondToMessage: got Advertisment");
00467 Service sv;
00468
00469 pack_uint16 ntypes;
00470 p += unpack(buf + p, buf_len - p, "s", &ntypes);
00471 int i, len;
00472 for(i = 0; i < ntypes; i++) {
00473 char* s;
00474 p += unpack(buf + p, buf_len - p, "S", &s, &len);
00475 sv.types.insert(string(s, len));
00476 }
00477
00478 char *title, *desc, *url;
00479 int tlen, dlen, ulen;
00480 unpack(buf + p, buf_len - p, "SSS",
00481 &title, &tlen,
00482 &desc, &dlen,
00483 &url, &ulen);
00484 sv.title = string(title, tlen);
00485 sv.description = string(desc, dlen);
00486 DEBUG("unpacking advertisement message... URL length is " << ulen);
00487 sv.url = string(url, ulen);
00488 DEBUG("URL is " << sv.url);
00489
00490 if(nonce == 0) {
00491 for(map<int, ServiceDirectory::Query>::iterator i = queries.begin(); i != queries.end(); i++) {
00492 ServiceMatch sm((*i).second.typepattern,
00493 (*i).second.titlepattern,
00494 (*i).second.descpattern,
00495 (*i).second.urlpattern);
00496 if(sm.match(sv)) {
00497 (*i).second.listener->notifyNewService((*i).first, sv);
00498 }
00499 }
00500 } else if(queries.count(nonce)) queries[nonce].listener->notifyNewService(nonce, sv);
00501 }
00502 }
00503
00504 void ServiceDirectory::addService(Service* s)
00505 {
00506 DEBUG("Advertising service. URL is " << s->url << " (length " << s->url.size() << ")");
00507
00508
00509 pack_ubyte buf[2048];
00510 int p = pack(buf, sizeof(buf), "ccls", 0, 'A', 0, s->types.size());
00511 for(set<string>::iterator ti = s->types.begin(); ti != s->types.end(); ti++) {
00512 p += pack(buf + p, sizeof(buf) - p, "S", (*ti).c_str(), (*ti).size());
00513 }
00514 p += pack(buf + p, sizeof(buf) - p, "SSS",
00515 s->title.c_str(), s->title.size(),
00516 s->description.c_str(), s->description.size(),
00517 s->url.c_str(), s->url.size());
00518
00519
00520 if(udp_fd != -1) {
00521 struct sockaddr_in skaddr;
00522 skaddr.sin_family = AF_INET;
00523 skaddr.sin_port = htons(udp_port);
00524 skaddr.sin_addr.s_addr = INADDR_BROADCAST;
00525 DEBUG("sending announcement message to udp bcast...");
00526 sendto(udp_fd, buf, p, 0, (struct sockaddr*)&skaddr, sizeof(struct sockaddr_in));
00527 }
00528
00529
00530 for( map<pair<char*, int>, int>::const_iterator i = tcp_relays.begin();
00531 i != tcp_relays.end();
00532 i++)
00533 {
00534 int fd = i->second;
00535 DEBUG("sending announcement to tcp socket " << fd);
00536 if( send(fd, buf, p, 0) < 0 ) {
00537 removeRelay(i->first.first, i->first.second);
00538 }
00539 }
00540
00541
00542 services.insert(s);
00543 }
00544
00545 void ServiceDirectory::removeService(Service* s)
00546 {
00547 services.erase(s);
00548 }
00549
00550 void ServiceDirectory::listenTCP(int port) {
00551 if( port == -1 && tcp_fd != -1) {
00552 shutdown(tcp_fd, SHUT_RDWR);
00553 tcp_fd = -1;
00554 return;
00555 }
00556 tcp_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
00557 struct sockaddr_in addr;
00558 memset(&addr, 0, sizeof(addr));
00559 addr.sin_family = AF_INET;
00560 addr.sin_port = htons(port);
00561 addr.sin_addr.s_addr = 0;
00562 bind(tcp_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in));
00563 if(tcp_fd < 0)
00564 throw runtime_error("Couldn't bind port (listenTCP): " + string(strerror(errno)));
00565 if( listen(tcp_fd, 5) < 0 )
00566 throw runtime_error("Couldn't listen on port (listenTCP): " + string(strerror(errno)));
00567 DEBUG("listening on tcp socket " << tcp_fd);
00568 }
00569
00570 void ServiceDirectory::addRelay(char* hostname, int port) {
00571 int fd;
00572 fd = socket(PF_INET, SOCK_STREAM, 0);
00573 struct hostent* host;
00574 host = gethostbyname(hostname);
00575 if(!host)
00576 throw runtime_error("Error looking up hostname \"" + string(hostname) + "\": " + strerror(errno));
00577 struct sockaddr_in addr;
00578 memset(&addr, 0, sizeof(addr));
00579 addr.sin_family = AF_INET;
00580 addr.sin_port = htons(port);
00581 addr.sin_addr = *((struct in_addr*)host->h_addr);
00582 int c = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
00583 if(c < 0) {
00584 char ps[16];
00585 snprintf(ps, sizeof(ps), "%d", port);
00586 throw runtime_error("Error connecting to " + string(hostname) + ":" + ps + ": " + strerror(errno));
00587 }
00588 pair<char*, int> p;
00589 p.first = hostname;
00590 p.second = port;
00591 tcp_relays[p] = fd;
00592 }
00593
00594 void ServiceDirectory::removeRelay(char* hostname, int port) {
00595 pair<char*, int> p;
00596 p.first = hostname;
00597 p.second = port;
00598 if( tcp_relays.find(p) == tcp_relays.end() )
00599 return;
00600 shutdown(tcp_relays[p], SHUT_RDWR);
00601 tcp_relays.erase(p);
00602 }
00603
00604 deque<int> ServiceDirectory::getRelayFds() {
00605 deque<int> r;
00606 for( map<pair<char*, int>, int>::const_iterator i = tcp_relays.begin();
00607 i != tcp_relays.end();
00608 i++)
00609 {
00610 r.push_back(i->second);
00611 }
00612 return r;
00613 }
00614
00615 deque<int> ServiceDirectory::getIncomingTCPFds() {
00616 return incoming_tcp_fds;
00617 }
00618
00619