00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <sys/poll.h>
00022
00023 #include <oasys/io/NetUtils.h>
00024 #include <oasys/thread/Timer.h>
00025 #include <oasys/util/OptParser.h>
00026 #include <oasys/util/StringBuffer.h>
00027
00028 #include "UDPConvergenceLayer.h"
00029 #include "bundling/Bundle.h"
00030 #include "bundling/BundleEvent.h"
00031 #include "bundling/BundleDaemon.h"
00032 #include "bundling/BundleList.h"
00033 #include "bundling/BundleProtocol.h"
00034
00035 namespace dtn {
00036
00037 struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_;
00038
00039
00040 void
00041 UDPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
00042 {
00043 a->process("local_addr", oasys::InAddrPtr(&local_addr_));
00044 a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
00045 a->process("local_port", &local_port_);
00046 a->process("remote_port", &remote_port_);
00047 a->process("rate", &rate_);
00048 a->process("bucket_depth", &bucket_depth_);
00049 }
00050
00051
00052 UDPConvergenceLayer::UDPConvergenceLayer()
00053 : IPConvergenceLayer("UDPConvergenceLayer", "udp")
00054 {
00055 defaults_.local_addr_ = INADDR_ANY;
00056 defaults_.local_port_ = UDPCL_DEFAULT_PORT;
00057 defaults_.remote_addr_ = INADDR_NONE;
00058 defaults_.remote_port_ = 0;
00059 defaults_.rate_ = 0;
00060 defaults_.bucket_depth_ = 0;
00061 }
00062
00063
00064 bool
00065 UDPConvergenceLayer::parse_params(Params* params,
00066 int argc, const char** argv,
00067 const char** invalidp)
00068 {
00069 oasys::OptParser p;
00070
00071 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
00072 p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_));
00073 p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_));
00074 p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_));
00075 p.addopt(new oasys::UIntOpt("rate", ¶ms->rate_));
00076 p.addopt(new oasys::UIntOpt("bucket_depth_", ¶ms->bucket_depth_));
00077
00078 if (! p.parse(argc, argv, invalidp)) {
00079 return false;
00080 }
00081
00082 return true;
00083 };
00084
00085
00086 bool
00087 UDPConvergenceLayer::interface_up(Interface* iface,
00088 int argc, const char* argv[])
00089 {
00090 log_debug("adding interface %s", iface->name().c_str());
00091
00092
00093
00094 Params params = UDPConvergenceLayer::defaults_;
00095 const char* invalid;
00096 if (!parse_params(¶ms, argc, argv, &invalid)) {
00097 log_err("error parsing interface options: invalid option '%s'",
00098 invalid);
00099 return false;
00100 }
00101
00102
00103 if (params.local_addr_ == INADDR_NONE) {
00104 log_err("invalid local address setting of 0");
00105 return false;
00106 }
00107
00108 if (params.local_port_ == 0) {
00109 log_err("invalid local port setting of 0");
00110 return false;
00111 }
00112
00113
00114 Receiver* receiver = new Receiver(¶ms);
00115 receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00116
00117 if (receiver->bind(params.local_addr_, params.local_port_) != 0) {
00118 return false;
00119 }
00120
00121
00122 if (params.remote_addr_ != INADDR_NONE) {
00123 if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) {
00124 return false;
00125 }
00126 }
00127
00128
00129 receiver->start();
00130
00131
00132
00133 iface->set_cl_info(receiver);
00134
00135 return true;
00136 }
00137
00138
00139 bool
00140 UDPConvergenceLayer::interface_down(Interface* iface)
00141 {
00142
00143
00144
00145
00146 Receiver* receiver = (Receiver*)iface->cl_info();
00147 receiver->set_should_stop();
00148 receiver->interrupt_from_io();
00149
00150 while (! receiver->is_stopped()) {
00151 oasys::Thread::yield();
00152 }
00153
00154 delete receiver;
00155 return true;
00156 }
00157
00158
00159 void
00160 UDPConvergenceLayer::dump_interface(Interface* iface,
00161 oasys::StringBuffer* buf)
00162 {
00163 Params* params = &((Receiver*)iface->cl_info())->params_;
00164
00165 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00166 intoa(params->local_addr_), params->local_port_);
00167
00168 if (params->remote_addr_ != INADDR_NONE) {
00169 buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
00170 intoa(params->remote_addr_), params->remote_port_);
00171 } else {
00172 buf->appendf("\tnot connected\n");
00173 }
00174 }
00175
00176
00177 bool
00178 UDPConvergenceLayer::init_link(const LinkRef& link,
00179 int argc, const char* argv[])
00180 {
00181 in_addr_t addr;
00182 u_int16_t port = 0;
00183
00184 ASSERT(link != NULL);
00185 ASSERT(!link->isdeleted());
00186 ASSERT(link->cl_info() == NULL);
00187
00188 log_debug("adding %s link %s", link->type_str(), link->nexthop());
00189
00190
00191
00192
00193 parse_nexthop(link->nexthop(), &addr, &port);
00194
00195
00196
00197 Params* params = new Params(defaults_);
00198 params->local_addr_ = INADDR_NONE;
00199 params->local_port_ = 0;
00200
00201 const char* invalid;
00202 if (! parse_params(params, argc, argv, &invalid)) {
00203 log_err("error parsing link options: invalid option '%s'", invalid);
00204 delete params;
00205 return false;
00206 }
00207
00208 if (link->params().mtu_ > MAX_BUNDLE_LEN) {
00209 log_err("error parsing link options: mtu %d > maximum %d",
00210 link->params().mtu_, MAX_BUNDLE_LEN);
00211 delete params;
00212 return false;
00213 }
00214
00215 link->set_cl_info(params);
00216 return true;
00217 }
00218
00219
00220 void
00221 UDPConvergenceLayer::delete_link(const LinkRef& link)
00222 {
00223 ASSERT(link != NULL);
00224 ASSERT(!link->isdeleted());
00225 ASSERT(link->cl_info() != NULL);
00226
00227 log_debug("UDPConvergenceLayer::delete_link: "
00228 "deleting link %s", link->name());
00229
00230 delete link->cl_info();
00231 link->set_cl_info(NULL);
00232 }
00233
00234
00235 void
00236 UDPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
00237 {
00238 ASSERT(link != NULL);
00239 ASSERT(!link->isdeleted());
00240 ASSERT(link->cl_info() != NULL);
00241
00242 Params* params = (Params*)link->cl_info();
00243
00244 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00245 intoa(params->local_addr_), params->local_port_);
00246
00247 buf->appendf("\tremote_addr: %s remote_port: %d\n",
00248 intoa(params->remote_addr_), params->remote_port_);
00249 }
00250
00251
00252 bool
00253 UDPConvergenceLayer::open_contact(const ContactRef& contact)
00254 {
00255 in_addr_t addr;
00256 u_int16_t port;
00257
00258 LinkRef link = contact->link();
00259 ASSERT(link != NULL);
00260 ASSERT(!link->isdeleted());
00261 ASSERT(link->cl_info() != NULL);
00262
00263 log_debug("UDPConvergenceLayer::open_contact: "
00264 "opening contact for link *%p", link.object());
00265
00266
00267 if (! parse_nexthop(link->nexthop(), &addr, &port)) {
00268 log_err("invalid next hop address '%s'", link->nexthop());
00269 return false;
00270 }
00271
00272
00273 if (addr == INADDR_ANY || addr == INADDR_NONE) {
00274 log_err("can't lookup hostname in next hop address '%s'",
00275 link->nexthop());
00276 return false;
00277 }
00278
00279
00280 if (port == 0) {
00281 port = UDPCL_DEFAULT_PORT;
00282 }
00283
00284 Params* params = (Params*)link->cl_info();
00285
00286
00287 Sender* sender = new Sender(link->contact());
00288
00289 if (!sender->init(params, addr, port)) {
00290 log_err("error initializing contact");
00291 BundleDaemon::post(
00292 new LinkStateChangeRequest(link, Link::UNAVAILABLE,
00293 ContactEvent::NO_INFO));
00294 delete sender;
00295 return false;
00296 }
00297
00298 contact->set_cl_info(sender);
00299 BundleDaemon::post(new ContactUpEvent(link->contact()));
00300
00301
00302
00303
00304 return true;
00305 }
00306
00307
00308 bool
00309 UDPConvergenceLayer::close_contact(const ContactRef& contact)
00310 {
00311 Sender* sender = (Sender*)contact->cl_info();
00312
00313 log_info("close_contact *%p", contact.object());
00314
00315 if (sender) {
00316 delete sender;
00317 contact->set_cl_info(NULL);
00318 }
00319
00320 return true;
00321 }
00322
00323
00324 void
00325 UDPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
00326 {
00327 ASSERT(link != NULL);
00328 ASSERT(!link->isdeleted());
00329
00330 const ContactRef& contact = link->contact();
00331 Sender* sender = (Sender*)contact->cl_info();
00332 if (!sender) {
00333 log_crit("send_bundles called on contact *%p with no Sender!!",
00334 contact.object());
00335 return;
00336 }
00337 ASSERT(contact == sender->contact_);
00338
00339 int len = sender->send_bundle(bundle);
00340
00341 if (len > 0) {
00342 link->del_from_queue(bundle, len);
00343 link->add_to_inflight(bundle, len);
00344 BundleDaemon::post(
00345 new BundleTransmittedEvent(bundle.object(), contact, link, len, 0));
00346 }
00347 }
00348
00349
00350 UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params)
00351 : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")),
00352 UDPClient("/dtn/cl/udp/receiver"),
00353 Thread("UDPConvergenceLayer::Receiver")
00354 {
00355 logfd_ = false;
00356 params_ = *params;
00357 }
00358
00359
00360 void
00361 UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00362 {
00363
00364 Bundle* bundle = new Bundle();
00365
00366 bool complete = false;
00367 int cc = BundleProtocol::consume(bundle, bp, len, &complete);
00368
00369 if (cc < 0) {
00370 log_err("process_data: bundle protocol error");
00371 delete bundle;
00372 return;
00373 }
00374
00375 if (!complete) {
00376 log_err("process_data: incomplete bundle");
00377 delete bundle;
00378 return;
00379 }
00380
00381 log_debug("process_data: new bundle id %d arrival, length %zu (payload %zu)",
00382 bundle->bundleid(), len, bundle->payload().length());
00383
00384 BundleDaemon::post(
00385 new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, EndpointID::NULL_EID()));
00386 }
00387
00388
00389 void
00390 UDPConvergenceLayer::Receiver::run()
00391 {
00392 int ret;
00393 in_addr_t addr;
00394 u_int16_t port;
00395 u_char buf[MAX_UDP_PACKET];
00396
00397 while (1) {
00398 if (should_stop())
00399 break;
00400
00401 ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port);
00402 if (ret <= 0) {
00403 if (errno == EINTR) {
00404 continue;
00405 }
00406 log_err("error in recvfrom(): %d %s",
00407 errno, strerror(errno));
00408 close();
00409 break;
00410 }
00411
00412 log_debug("got %d byte packet from %s:%d",
00413 ret, intoa(addr), port);
00414 process_data(buf, ret);
00415 }
00416 }
00417
00418
00419 UDPConvergenceLayer::Sender::Sender(const ContactRef& contact)
00420 : Logger("UDPConvergenceLayer::Sender",
00421 "/dtn/cl/udp/sender/%p", this),
00422 socket_(logpath_),
00423 rate_socket_(logpath_, 0, 0),
00424 contact_(contact.object(), "UDPCovergenceLayer::Sender")
00425 {
00426 }
00427
00428
00429 bool
00430 UDPConvergenceLayer::Sender::init(Params* params,
00431 in_addr_t addr, u_int16_t port)
00432
00433 {
00434 log_debug("initializing sender");
00435
00436 params_ = params;
00437
00438 socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port);
00439 socket_.set_logfd(false);
00440
00441 if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0)
00442 {
00443 if (socket_.bind(params->local_addr_, params->local_port_) != 0) {
00444 log_err("error binding to %s:%d: %s",
00445 intoa(params->local_addr_), params->local_port_,
00446 strerror(errno));
00447 return false;
00448 }
00449 }
00450
00451 if (socket_.connect(addr, port) != 0) {
00452 log_err("error issuing udp connect to %s:%d: %s",
00453 intoa(addr), port, strerror(errno));
00454 return false;
00455 }
00456
00457 if (params->rate_ != 0) {
00458 rate_socket_.bucket()->set_rate(params->rate_);
00459
00460 if (params->bucket_depth_ != 0) {
00461 rate_socket_.bucket()->set_depth(params->bucket_depth_);
00462 }
00463
00464 log_debug("initialized rate controller: rate %llu depth %llu",
00465 U64FMT(rate_socket_.bucket()->rate()),
00466 U64FMT(rate_socket_.bucket()->depth()));
00467 }
00468
00469 return true;
00470 }
00471
00472
00473 int
00474 UDPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle)
00475 {
00476 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link());
00477 ASSERT(blocks != NULL);
00478
00479 bool complete = false;
00480 size_t total_len = BundleProtocol::produce(bundle.object(), blocks,
00481 buf_, 0, sizeof(buf_),
00482 &complete);
00483 if (!complete) {
00484 size_t formatted_len = BundleProtocol::total_length(blocks);
00485 log_err("send_bundle: bundle too big (%zu > %u)",
00486 formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN);
00487 return -1;
00488 }
00489
00490
00491 int cc = socket_.write((char*)buf_, total_len);
00492 if (cc == (int)total_len) {
00493 log_info("send_bundle: successfully sent bundle length %d", cc);
00494 return total_len;
00495 } else {
00496 log_err("send_bundle: error sending bundle (wrote %d/%zu): %s",
00497 cc, total_len, strerror(errno));
00498 return -1;
00499 }
00500 }
00501
00502 }