00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021
00022 #ifdef __linux__
00023
00024 #include <sys/poll.h>
00025 #include <stdlib.h>
00026 #include <sys/types.h>
00027 #include <sys/socket.h>
00028 #include <netinet/in.h>
00029 #include <net/ethernet.h>
00030 #include <netpacket/packet.h>
00031 #include <sys/ioctl.h>
00032
00033 #include <oasys/io/NetUtils.h>
00034 #include <oasys/io/IO.h>
00035 #include <oasys/thread/Timer.h>
00036 #include <oasys/util/OptParser.h>
00037 #include <oasys/util/StringBuffer.h>
00038
00039 #include "EthConvergenceLayer.h"
00040 #include "bundling/Bundle.h"
00041 #include "bundling/BundleEvent.h"
00042 #include "bundling/BundleDaemon.h"
00043 #include "bundling/BundleList.h"
00044 #include "bundling/BundleProtocol.h"
00045 #include "contacts/ContactManager.h"
00046 #include "contacts/Link.h"
00047
00048 using namespace oasys;
00049 namespace dtn {
00050
00051 struct EthConvergenceLayer::Params EthConvergenceLayer::defaults_;
00052
00053
00054
00055
00056
00057
00058
00059 EthConvergenceLayer::EthConvergenceLayer()
00060 : ConvergenceLayer("EthConvergenceLayer", "eth")
00061 {
00062 defaults_.beacon_interval_ = 1;
00063 }
00064
00068 bool
00069 EthConvergenceLayer::parse_params(Params* params,
00070 int argc, const char* argv[],
00071 const char** invalidp)
00072 {
00073 oasys::OptParser p;
00074
00075 p.addopt(new oasys::UIntOpt("beacon_interval", ¶ms->beacon_interval_));
00076
00077 if (! p.parse(argc, argv, invalidp)) {
00078 return false;
00079 }
00080
00081 return true;
00082 }
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095 bool
00096 EthConvergenceLayer::interface_up(Interface* iface,
00097 int argc, const char* argv[])
00098 {
00099 Params params = EthConvergenceLayer::defaults_;
00100 const char *invalid;
00101 if (!parse_params(¶ms, argc, argv, &invalid)) {
00102 log_err("error parsing interface options: invalid option '%s'",
00103 invalid);
00104 return false;
00105 }
00106
00107
00108
00109
00110
00111 const char* if_name=iface->name().c_str()+strlen("string://");
00112 log_info("EthConvergenceLayer::interface_up(%s).", if_name);
00113
00114 Receiver* receiver = new Receiver(if_name, ¶ms);
00115 receiver->logpathf("/cl/eth");
00116 receiver->start();
00117 iface->set_cl_info(receiver);
00118
00119
00120 if_beacon_ = new Beacon(if_name, params.beacon_interval_);
00121 if_beacon_->logpathf("/cl/eth");
00122 if_beacon_->start();
00123
00124 return true;
00125 }
00126
00127 bool
00128 EthConvergenceLayer::interface_down(Interface* iface)
00129 {
00130
00131
00132
00133
00134
00135 if_beacon_->set_should_stop();
00136 while (! if_beacon_->is_stopped()) {
00137 oasys::Thread::yield();
00138 }
00139 delete if_beacon_;
00140
00141 Receiver *receiver = (Receiver *)iface->cl_info();
00142 receiver->set_should_stop();
00143
00144 while (! receiver->is_stopped()) {
00145 oasys::Thread::yield();
00146 }
00147 delete receiver;
00148
00149 return true;
00150 }
00151
00152 bool
00153 EthConvergenceLayer::open_contact(const ContactRef& contact)
00154 {
00155 LinkRef link = contact->link();
00156 ASSERT(link != NULL);
00157 ASSERT(!link->isdeleted());
00158 ASSERT(link->cl_info() != NULL);
00159
00160 log_debug("EthConvergenceLayer::open_contact: "
00161 "opening contact to link *%p", link.object());
00162
00163
00164 eth_addr_t addr;
00165 if (!EthernetScheme::parse(link->nexthop(), &addr)) {
00166 log_err("EthConvergenceLayer::open_contact: "
00167 "next hop address '%s' not a valid eth uri", link->nexthop());
00168 return false;
00169 }
00170
00171
00172 Sender* sender = new Sender(((EthCLInfo*)link->cl_info())->if_name_,
00173 link->contact());
00174 contact->set_cl_info(sender);
00175
00176 sender->logpathf("/cl/eth");
00177
00178 BundleDaemon::post(new ContactUpEvent(contact));
00179 return true;
00180 }
00181
00182 bool
00183 EthConvergenceLayer::close_contact(const ContactRef& contact)
00184 {
00185 Sender* sender = (Sender*)contact->cl_info();
00186
00187 log_info("close_contact *%p", contact.object());
00188
00189 if (sender) {
00190 contact->set_cl_info(NULL);
00191 delete sender;
00192 }
00193
00194 return true;
00195 }
00196
00197 void
00198 EthConvergenceLayer::delete_link(const LinkRef& link)
00199 {
00200 ASSERT(link != NULL);
00201 ASSERT(!link->isdeleted());
00202
00203 log_debug("EthConvergenceLayer::delete_link: "
00204 "deleting link %s", link->name());
00205
00206 if (link->cl_info() != NULL) {
00207 delete link->cl_info();
00208 link->set_cl_info(NULL);
00209 }
00210 }
00211
00215 void
00216 EthConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
00217 {
00218 ASSERT(link != NULL);
00219 ASSERT(!link->isdeleted());
00220
00221 const ContactRef& contact = link->contact();
00222 Sender* sender = (Sender*)contact->cl_info();
00223 if (!sender) {
00224 log_crit("send_bundles called on contact *%p with no Sender!!",
00225 contact.object());
00226 return;
00227 }
00228 ASSERT(contact == sender->contact_);
00229
00230 sender->send_bundle(bundle);
00231 }
00232
00233 bool
00234 EthConvergenceLayer::is_queued(const LinkRef& contact, Bundle* bundle)
00235 {
00236 (void)contact;
00237 (void)bundle;
00238
00240 return false;
00241 }
00242
00243
00244
00245
00246
00247
00248 EthConvergenceLayer::Receiver::Receiver(const char* if_name,
00249 EthConvergenceLayer::Params* params)
00250 : Logger("EthConvergenceLayer::Receiver", "/dtn/cl/eth/receiver"),
00251 Thread("EthConvergenceLayer::Receiver")
00252 {
00253 memset(if_name_,0, IFNAMSIZ);
00254 strcpy(if_name_,if_name);
00255 Thread::flags_ |= INTERRUPTABLE;
00256 (void)params;
00257 }
00258
00259 void
00260 EthConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00261 {
00262 Bundle* bundle = NULL;
00263 EthCLHeader ethclhdr;
00264 size_t bundle_len;
00265 struct ether_header* ethhdr=(struct ether_header*)bp;
00266
00267 log_debug("Received DTN packet on interface %s, %zu.",if_name_, len);
00268
00269
00270 if (len < sizeof(EthCLHeader)) {
00271 log_err("process_data: "
00272 "incoming packet too small (len = %zu)", len);
00273 return;
00274 }
00275 memcpy(ðclhdr, bp+sizeof(struct ether_header), sizeof(EthCLHeader));
00276
00277
00278 if (ethclhdr.version != ETHCL_VERSION) {
00279 log_warn("remote sent version %d, expected version %d "
00280 "-- disconnecting.", ethclhdr.version, ETHCL_VERSION);
00281 return;
00282 }
00283
00284 if(ethclhdr.type == ETHCL_BEACON) {
00285 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00286
00287 char bundles_string[60];
00288 memset(bundles_string,0,60);
00289 EthernetScheme::to_string(ðhdr->ether_shost[0],
00290 bundles_string);
00291 char next_hop_string[50], *ptr;
00292 memset(next_hop_string,0,50);
00293 ptr = strrchr(bundles_string, '/');
00294 strcpy(next_hop_string, ptr+1);
00295
00296 ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00297 EndpointID remote_eid(bundles_string);
00298
00299 LinkRef link = cm->find_link_to(cl,
00300 next_hop_string,
00301 remote_eid,
00302 Link::OPPORTUNISTIC);
00303
00304 if(link == NULL) {
00305 log_info("EthConvergenceLayer::Receiver::process_data: "
00306 "Discovered next_hop %s on interface %s.",
00307 next_hop_string, if_name_);
00308
00309
00310 link = cm->new_opportunistic_link(cl,
00311 next_hop_string,
00312 EndpointID(bundles_string));
00313
00314 if (link == NULL) {
00315 log_debug("EthConvergenceLayer::Receiver::process_data: "
00316 "failed to create new opportunistic link");
00317 return;
00318 }
00319
00320 oasys::ScopeLock l(link->lock(), "EthConvergenceLayer::Receiver");
00321
00322
00323 if (link->isdeleted()) {
00324 log_warn("EthConvergenceLayer::Receiver::process_data: "
00325 "link %s deleted before fully initialized",
00326 link->name());
00327 return;
00328 }
00329 ASSERT(link->cl_info() == NULL);
00330 link->set_cl_info(new EthCLInfo(if_name_));
00331 l.unlock();
00332 }
00333
00334 ASSERT(link != NULL);
00335 oasys::ScopeLock l(link->lock(), "EthConvergenceLayer::Receiver");
00336
00337 if (link->isdeleted()) {
00338 log_warn("EthConvergenceLayer::Receiver::process_data: "
00339 "link %s already deleted", link->name());
00340 return;
00341 }
00342
00343 ASSERT(link->cl_info() != NULL);
00344 ASSERT(strcmp(((EthCLInfo*)link->cl_info())->if_name_, if_name_) == 0);
00345
00346 if(!link->isavailable()) {
00347 log_info("EthConvergenceLayer::Receiver::process_data: "
00348 "Got beacon for previously unavailable link %s",
00349 link->name());
00350
00351
00352 log_err("XXX/demmer do something about link availability");
00353 }
00354
00360 BeaconTimer *timer = ((EthCLInfo*)link->cl_info())->timer;
00361 if (timer)
00362 timer->cancel();
00363
00364 timer = new BeaconTimer(next_hop_string);
00365 timer->schedule_in(ETHCL_BEACON_TIMEOUT_INTERVAL);
00366
00367 ((EthCLInfo*)link->cl_info())->timer = timer;
00368
00369 l.unlock();
00370 }
00371 else if(ethclhdr.type == ETHCL_BUNDLE) {
00372
00373
00374 bundle_len = len - sizeof(EthCLHeader) - sizeof(struct ether_header);
00375
00376 log_debug("process_data: got ethcl header -- bundle id %d, length %zu",
00377 ntohl(ethclhdr.bundle_id), bundle_len);
00378
00379
00380 bp += (sizeof(EthCLHeader) + sizeof(struct ether_header));
00381 len -= (sizeof(EthCLHeader) + sizeof(struct ether_header));
00382
00383 bundle = new Bundle();
00384 bool complete = false;
00385 int cc = BundleProtocol::consume(bundle, bp, len, &complete);
00386
00387 if (cc < 0) {
00388 log_err("process_data: bundle protocol error");
00389 delete bundle;
00390 return;
00391 }
00392
00393 if (!complete) {
00394 log_err("process_data: incomplete bundle");
00395 delete bundle;
00396 return;
00397 }
00398
00399 log_debug("process_data: new bundle id %d arrival, bundle length %zu",
00400 bundle->bundleid(), bundle_len);
00401
00402 BundleDaemon::post(
00403 new BundleReceivedEvent(bundle, EVENTSRC_PEER,
00404 bundle_len, EndpointID::NULL_EID()));
00405 }
00406 }
00407
00408 void
00409 EthConvergenceLayer::Receiver::run()
00410 {
00411 int sock;
00412 int cc;
00413 struct sockaddr_ll iface;
00414 unsigned char buffer[MAX_ETHER_PACKET];
00415
00416 if((sock = socket(PF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00417 perror("socket");
00418 log_err("EthConvergenceLayer::Receiver::run() "
00419 "Couldn't open socket.");
00420 exit(1);
00421 }
00422
00423
00424 struct ifreq req;
00425 strcpy(req.ifr_name, if_name_);
00426 ioctl(sock, SIOCGIFINDEX, &req);
00427
00428 memset(&iface, 0, sizeof(iface));
00429 iface.sll_family=AF_PACKET;
00430 iface.sll_protocol=htons(ETHERTYPE_DTN);
00431 iface.sll_ifindex=req.ifr_ifindex;
00432
00433 if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00434 perror("bind");
00435 exit(1);
00436 }
00437
00438 log_warn("Reading from socket...");
00439 while(true) {
00440 cc=read (sock, buffer, MAX_ETHER_PACKET);
00441 if(cc<=0) {
00442 perror("EthConvergenceLayer::Receiver::run()");
00443 exit(1);
00444 }
00445 struct ether_header* hdr=(struct ether_header*)buffer;
00446
00447 if(ntohs(hdr->ether_type)==ETHERTYPE_DTN) {
00448 process_data(buffer, cc);
00449 }
00450 else if(ntohs(hdr->ether_type)!=0x800)
00451 {
00452 log_err("Got non-DTN packet in Receiver, type %4X.",
00453 ntohs(hdr->ether_type));
00454
00455 }
00456
00457 if(should_stop())
00458 break;
00459 }
00460 }
00461
00462
00463
00464
00465
00466
00467
00471 EthConvergenceLayer::Sender::Sender(char* if_name,
00472 const ContactRef& contact)
00473 : Logger("EthConvergenceLayer::Sender", "/dtn/cl/eth/sender"),
00474 contact_(contact.object(), "EthConvergenceLayer::Sender")
00475 {
00476 struct ifreq req;
00477 struct sockaddr_ll iface;
00478 LinkRef link = contact->link();
00479
00480 memset(src_hw_addr_.octet, 0, 6);
00481 EthernetScheme::parse(link->nexthop(), &dst_hw_addr_);
00482
00483 strcpy(if_name_, if_name);
00484 sock_ = 0;
00485
00486 memset(&req, 0, sizeof(req));
00487 memset(&iface, 0, sizeof(iface));
00488
00489
00490
00491
00492 if((sock_ = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00493 perror("socket");
00494 exit(1);
00495 }
00496
00497
00498 strcpy(req.ifr_name, if_name_);
00499
00500
00501 ioctl(sock_, SIOCGIFINDEX, &req);
00502
00503 iface.sll_family=AF_PACKET;
00504 iface.sll_protocol=htons(ETHERTYPE_DTN);
00505 iface.sll_ifindex=req.ifr_ifindex;
00506
00507
00508 if(ioctl(sock_, SIOCGIFHWADDR, &req))
00509 {
00510 perror("ioctl");
00511 exit(1);
00512 }
00513 memcpy(src_hw_addr_.octet,req.ifr_hwaddr.sa_data,6);
00514
00515 if (bind(sock_, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00516 perror("bind");
00517 exit(1);
00518 }
00519 }
00520
00521
00522
00523
00524 bool
00525 EthConvergenceLayer::Sender::send_bundle(const BundleRef& bundle)
00526 {
00527 int cc;
00528 struct iovec iov[3];
00529
00530 EthCLHeader ethclhdr;
00531 struct ether_header hdr;
00532
00533 memset(iov,0,sizeof(iov));
00534
00535
00536
00537 iov[0].iov_base = (char*)&hdr;
00538 iov[0].iov_len = sizeof(struct ether_header);
00539
00540
00541
00542 memcpy(hdr.ether_dhost,dst_hw_addr_.octet,6);
00543 memcpy(hdr.ether_shost,src_hw_addr_.octet,6);
00544 hdr.ether_type=htons(ETHERTYPE_DTN);
00545
00546
00547
00548 iov[1].iov_base = (char*)ðclhdr;
00549 iov[1].iov_len = sizeof(EthCLHeader);
00550
00551
00552
00553 ethclhdr.version = ETHCL_VERSION;
00554 ethclhdr.type = ETHCL_BUNDLE;
00555 ethclhdr.bundle_id = htonl(bundle->bundleid());
00556
00557
00558 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link());
00559 ASSERT(blocks != NULL);
00560
00561 bool complete = false;
00562 size_t total_len = BundleProtocol::produce(bundle.object(), blocks,
00563 buf_, 0, sizeof(buf_),
00564 &complete);
00565 if (!complete) {
00566 size_t formatted_len = BundleProtocol::total_length(blocks);
00567 log_err("send_bundle: bundle too big (%zu > %u)",
00568 formatted_len, MAX_ETHER_PACKET);
00569 return -1;
00570 }
00571
00572 iov[2].iov_base = (char *)buf_;
00573 iov[2].iov_len = total_len;
00574
00575
00576
00577 log_info("Sending bundle out interface %s",if_name_);
00578
00579 cc=IO::writevall(sock_, iov, 3);
00580 if(cc<0) {
00581 perror("send");
00582 log_err("Send failed!\n");
00583 }
00584 log_info("Sent packet, size: %d",cc );
00585
00586
00587 contact_->link()->del_from_queue(bundle, total_len);
00588 contact_->link()->add_to_inflight(bundle, total_len);
00589
00590
00591 bool ok;
00592 int total = sizeof(EthCLHeader) + sizeof(struct ether_header) + total_len;
00593 if (cc != total) {
00594 log_err("send_bundle: error writing bundle (wrote %d/%d): %s",
00595 cc, total, strerror(errno));
00596 ok = false;
00597 } else {
00598
00599
00600
00601 BundleDaemon::post(
00602 new BundleTransmittedEvent(bundle.object(), contact_,contact_->link(),
00603 total_len, false));
00604 ok = true;
00605 }
00606
00607 return ok;
00608 }
00609
00610 EthConvergenceLayer::Beacon::Beacon(const char* if_name,
00611 unsigned int beacon_interval)
00612 : Logger("EthConvergenceLayer::Beacon", "/dtn/cl/eth/beacon"),
00613 Thread("EthConvergenceLayer::Beacon")
00614 {
00615 Thread::flags_ |= INTERRUPTABLE;
00616 memset(if_name_, 0, IFNAMSIZ);
00617 strcpy(if_name_, if_name);
00618 beacon_interval_ = beacon_interval;
00619 }
00620
00621 void EthConvergenceLayer::Beacon::run()
00622 {
00623
00624 char bcast_mac_addr[6]={0xff,0xff,0xff,0xff,0xff,0xff};
00625
00626 struct ether_header hdr;
00627 struct sockaddr_ll iface;
00628 EthCLHeader ethclhdr;
00629
00630 int sock,cc;
00631 struct iovec iov[2];
00632
00633 memset(&hdr,0,sizeof(hdr));
00634 memset(ðclhdr,0,sizeof(ethclhdr));
00635 memset(&iface,0,sizeof(iface));
00636
00637 ethclhdr.version = ETHCL_VERSION;
00638 ethclhdr.type = ETHCL_BEACON;
00639
00640 hdr.ether_type=htons(ETHERTYPE_DTN);
00641
00642
00643 iov[0].iov_base = (char*)&hdr;
00644 iov[0].iov_len = sizeof(struct ether_header);
00645
00646
00647 iov[1].iov_base = (char*)ðclhdr;
00648 iov[1].iov_len = sizeof(EthCLHeader);
00649
00650
00651
00652
00653 if((sock = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00654 perror("socket");
00655 exit(1);
00656 }
00657
00658 struct ifreq req;
00659 strcpy(req.ifr_name, if_name_);
00660 if(ioctl(sock, SIOCGIFINDEX, &req))
00661 {
00662 perror("ioctl");
00663 exit(1);
00664 }
00665
00666 iface.sll_ifindex=req.ifr_ifindex;
00667
00668 if(ioctl(sock, SIOCGIFHWADDR, &req))
00669 {
00670 perror("ioctl");
00671 exit(1);
00672 }
00673
00674 memcpy(hdr.ether_dhost,bcast_mac_addr,6);
00675 memcpy(hdr.ether_shost,req.ifr_hwaddr.sa_data,6);
00676
00677 log_info("Interface %s has interface number %d.",if_name_,req.ifr_ifindex);
00678
00679 iface.sll_family=AF_PACKET;
00680 iface.sll_protocol=htons(ETHERTYPE_DTN);
00681
00682 if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00683 perror("bind");
00684 exit(1);
00685 }
00686
00687
00688
00689
00690 while(1) {
00691 sleep(beacon_interval_);
00692
00693 if (should_stop())
00694 break;
00695
00696 log_debug("Sent beacon out interface %s.\n",if_name_ );
00697
00698 cc=IO::writevall(sock, iov, 2);
00699 if(cc<0) {
00700 perror("send beacon");
00701 log_err("Send beacon failed!\n");
00702 }
00703 }
00704 }
00705
00706 EthConvergenceLayer::BeaconTimer::BeaconTimer(char * next_hop)
00707 : Logger("EthConvergenceLayer::BeaconTimer", "/dtn/cl/eth/beacontimer")
00708 {
00709 next_hop_=(char*)malloc(strlen(next_hop)+1);
00710 strcpy(next_hop_, next_hop);
00711 }
00712
00713 EthConvergenceLayer::BeaconTimer::~BeaconTimer()
00714 {
00715 free(next_hop_);
00716 }
00717
00718 void
00719 EthConvergenceLayer::BeaconTimer::timeout(const struct timeval& now)
00720 {
00721 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00722 ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00723 LinkRef link = cm->find_link_to(cl, next_hop_);
00724
00725 (void)now;
00726
00727 log_info("Neighbor %s timer expired.",next_hop_);
00728
00729 if(link == NULL) {
00730 log_warn("No link for next_hop %s.",next_hop_);
00731 }
00732 else if(link->isopen()) {
00733 BundleDaemon::post(
00734 new LinkStateChangeRequest(link, Link::CLOSED,
00735 ContactDownEvent::BROKEN));
00736 }
00737 else {
00738 log_warn("next_hop %s unexpectedly not open",next_hop_);
00739 }
00740 }
00741
00742 Timer *
00743 EthConvergenceLayer::BeaconTimer::copy()
00744 {
00745 return new BeaconTimer(*this);
00746 }
00747
00748 }
00749
00750 #endif // __linux