00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/util/OptParser.h>
00022
00023 #include "Link.h"
00024 #include "ContactManager.h"
00025 #include "AlwaysOnLink.h"
00026 #include "OndemandLink.h"
00027 #include "ScheduledLink.h"
00028 #include "OpportunisticLink.h"
00029
00030 #include "bundling/BundleDaemon.h"
00031 #include "bundling/BundleEvent.h"
00032 #include "conv_layers/ConvergenceLayer.h"
00033 #include "naming/EndpointIDOpt.h"
00034 #include "routing/RouterInfo.h"
00035
00036 namespace dtn {
00037
00038
00040 Link::Params::Params()
00041 : mtu_(0),
00042 min_retry_interval_(5),
00043 max_retry_interval_(10 * 60),
00044 idle_close_time_(0),
00045 potential_downtime_(30),
00046 prevhop_hdr_(false),
00047 cost_(100),
00048 qlimit_bundles_high_(10),
00049 qlimit_bytes_high_(1024*1024),
00050 qlimit_bundles_low_(5),
00051 qlimit_bytes_low_(512*1024)
00052 {}
00053
00054 Link::Params Link::default_params_;
00055
00056
00057 LinkRef
00058 Link::create_link(const std::string& name, link_type_t type,
00059 ConvergenceLayer* cl, const char* nexthop,
00060 int argc, const char* argv[],
00061 const char** invalid_argp)
00062 {
00063 LinkRef link("Link::create_link: return value");
00064 switch(type) {
00065 case ALWAYSON: link = new AlwaysOnLink(name, cl, nexthop); break;
00066 case ONDEMAND: link = new OndemandLink(name, cl, nexthop); break;
00067 case SCHEDULED: link = new ScheduledLink(name, cl, nexthop); break;
00068 case OPPORTUNISTIC: link = new OpportunisticLink(name, cl, nexthop); break;
00069 default: PANIC("bogus link_type_t");
00070 }
00071
00072
00073
00074 int count = link->parse_args(argc, argv, invalid_argp);
00075 if (count == -1) {
00076 link->deleted_ = true;
00077 link = NULL;
00078 return link;
00079 }
00080
00081 argc -= count;
00082
00083
00084
00085
00086
00087 ASSERT(link->clayer_);
00088 if (!link->clayer_->init_link(link, argc, argv)) {
00089 link->deleted_ = true;
00090 link = NULL;
00091 return link;
00092 }
00093
00094 link->logf(oasys::LOG_INFO, "new link *%p", link.object());
00095
00096
00097
00098
00099
00100
00101
00102
00103 return link;
00104 }
00105
00106
00107 Link::Link(const std::string& name, link_type_t type,
00108 ConvergenceLayer* cl, const char* nexthop)
00109 : RefCountedObject("/dtn/link/refs"),
00110 Logger("Link", "/dtn/link/%s", name.c_str()),
00111 type_(type),
00112 state_(UNAVAILABLE),
00113 deleted_(false),
00114 create_pending_(false),
00115 usable_(true),
00116 nexthop_(nexthop),
00117 name_(name),
00118 reliable_(false),
00119 lock_(),
00120 queue_(name + ":queue", &lock_),
00121 inflight_(name + ":inflight", &lock_),
00122 bundles_queued_(0),
00123 bytes_queued_(0),
00124 bundles_inflight_(0),
00125 bytes_inflight_(0),
00126 contact_("Link"),
00127 clayer_(cl),
00128 cl_info_(NULL),
00129 router_info_(NULL),
00130 remote_eid_(EndpointID::NULL_EID())
00131 {
00132 ASSERT(clayer_);
00133
00134 params_ = default_params_;
00135 retry_interval_ = 0;
00136
00137 memset(&stats_, 0, sizeof(Stats));
00138 }
00139
00140
00141 Link::Link(const oasys::Builder&)
00142 : RefCountedObject("/dtn/link/refs"),
00143 Logger("Link", "/dtn/link/UNKNOWN!!!"),
00144 type_(LINK_INVALID),
00145 state_(UNAVAILABLE),
00146 deleted_(false),
00147 create_pending_(false),
00148 usable_(false),
00149 nexthop_(""),
00150 name_(""),
00151 reliable_(false),
00152 lock_(),
00153 queue_("", &lock_),
00154 inflight_("", &lock_),
00155 bundles_queued_(0),
00156 bytes_queued_(0),
00157 bundles_inflight_(0),
00158 bytes_inflight_(0),
00159 contact_("Link"),
00160 clayer_(NULL),
00161 cl_info_(NULL),
00162 router_info_(NULL),
00163 remote_eid_(EndpointID::NULL_EID())
00164 {
00165 }
00166
00167
00168 void
00169 Link::delete_link()
00170 {
00171 oasys::ScopeLock l(&lock_, "Link::delete_link");
00172
00173 ASSERT(!isdeleted());
00174 ASSERT(clayer_ != NULL);
00175
00176 clayer_->delete_link(LinkRef(this, "Link::delete_link"));
00177 deleted_ = true;
00178 }
00179
00180
00181 bool
00182 Link::isdeleted() const
00183 {
00184 oasys::ScopeLock l(&lock_, "Link::delete_link");
00185 return deleted_;
00186 }
00187
00188
00189 bool
00190 Link::reconfigure_link(int argc, const char* argv[])
00191 {
00192 oasys::ScopeLock l(&lock_, "Link::reconfigure_link");
00193
00194 if (isdeleted()) {
00195 log_debug("Link::reconfigure_link: "
00196 "cannot reconfigure deleted link %s", name());
00197 return false;
00198 }
00199
00200 ASSERT(clayer_ != NULL);
00201 return clayer_->reconfigure_link(LinkRef(this, "Link::reconfigure_link"),
00202 argc, argv);
00203 }
00204
00205
00206 void
00207 Link::reconfigure_link(AttributeVector& params)
00208 {
00209 oasys::ScopeLock l(&lock_, "Link::reconfigure_link");
00210
00211 if (isdeleted()) {
00212 log_debug("Link::reconfigure_link: "
00213 "cannot reconfigure deleted link %s", name());
00214 return;
00215 }
00216
00217 AttributeVector::iterator iter;
00218 for (iter = params.begin(); iter != params.end(); ) {
00219 if (iter->name() == "is_usable") {
00220 if (iter->bool_val()) {
00221 set_usable(true);
00222 } else {
00223 set_usable(false);
00224 }
00225 ++iter;
00226
00227 } else if (iter->name() == "nexthop") {
00228 set_nexthop(iter->string_val());
00229 ++iter;
00230
00231
00232 } else if (iter->name() == "min_retry_interval") {
00233 params_.min_retry_interval_ = iter->u_int_val();
00234 iter = params.erase(iter);
00235
00236 } else if (iter->name() == "max_retry_interval") {
00237 params_.max_retry_interval_ = iter->u_int_val();
00238 iter = params.erase(iter);
00239
00240 } else if (iter->name() == "idle_close_time") {
00241 params_.idle_close_time_ = iter->u_int_val();
00242 iter = params.erase(iter);
00243
00244 } else if (iter->name() == "potential_downtime") {
00245 params_.potential_downtime_ = iter->u_int_val();
00246 iter = params.erase(iter);
00247
00248 } else {
00249 ++iter;
00250 }
00251 }
00252
00253 ASSERT(clayer_ != NULL);
00254 return clayer_->reconfigure_link(
00255 LinkRef(this, "Link::reconfigure_link"), params);
00256 }
00257
00258
00259 void
00260 Link::serialize(oasys::SerializeAction* a)
00261 {
00262 std::string cl_name;
00263 std::string type_str;
00264
00265 if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00266 a->process("type", &type_str);
00267 type_ = str_to_link_type(type_str.c_str());
00268 ASSERT(type_ != LINK_INVALID);
00269 } else {
00270 type_str = link_type_to_str(type());
00271 a->process("type", &type_str);
00272 }
00273
00274 a->process("nexthop", &nexthop_);
00275 a->process("name", &name_);
00276 a->process("state", &state_);
00277 a->process("deleted", &deleted_);
00278 a->process("usable", &usable_);
00279 a->process("reliable", &reliable_);
00280
00281 if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00282 a->process("clayer", &cl_name);
00283 clayer_ = ConvergenceLayer::find_clayer(cl_name.c_str());
00284 ASSERT(clayer_);
00285 } else {
00286 cl_name = clayer_->name();
00287 a->process("clayer", &cl_name);
00288 if (state_ == OPEN)
00289 a->process("clinfo", contact_->cl_info());
00290 }
00291
00292
00293
00294 a->process("remote_eid", &remote_eid_);
00295 a->process("min_retry_interval", ¶ms_.min_retry_interval_);
00296 a->process("max_retry_interval", ¶ms_.max_retry_interval_);
00297 a->process("idle_close_time", ¶ms_.idle_close_time_);
00298 a->process("potential_downtime", ¶ms_.potential_downtime_);
00299 a->process("cost", ¶ms_.cost_);
00300
00301 if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00302 logpathf("/dtn/link/%s", name_.c_str());
00303 }
00304 }
00305
00306
00307 int
00308 Link::parse_args(int argc, const char* argv[], const char** invalidp)
00309 {
00310 oasys::OptParser p;
00311
00312 p.addopt(new dtn::EndpointIDOpt("remote_eid", &remote_eid_));
00313 p.addopt(new oasys::BoolOpt("reliable", &reliable_));
00314 p.addopt(new oasys::StringOpt("nexthop", &nexthop_));
00315 p.addopt(new oasys::UIntOpt("mtu", ¶ms_.mtu_));
00316 p.addopt(new oasys::UIntOpt("min_retry_interval",
00317 ¶ms_.min_retry_interval_));
00318 p.addopt(new oasys::UIntOpt("max_retry_interval",
00319 ¶ms_.max_retry_interval_));
00320 p.addopt(new oasys::UIntOpt("idle_close_time",
00321 ¶ms_.idle_close_time_));
00322 p.addopt(new oasys::UIntOpt("potential_downtime",
00323 ¶ms_.potential_downtime_));
00324 p.addopt(new oasys::BoolOpt("prevhop_hdr", ¶ms_.prevhop_hdr_));
00325 p.addopt(new oasys::UIntOpt("cost", ¶ms_.cost_));
00326 p.addopt(new oasys::UIntOpt("qlimit_bundles_high",
00327 ¶ms_.qlimit_bundles_high_));
00328 p.addopt(new oasys::SizeOpt("qlimit_bytes_high",
00329 ¶ms_.qlimit_bytes_high_));
00330 p.addopt(new oasys::UIntOpt("qlimit_bundles_low",
00331 ¶ms_.qlimit_bundles_low_));
00332 p.addopt(new oasys::SizeOpt("qlimit_bytes_low",
00333 ¶ms_.qlimit_bytes_low_));
00334
00335 int ret = p.parse_and_shift(argc, argv, invalidp);
00336 if (ret == -1) {
00337 return -1;
00338 }
00339
00340 if (params_.min_retry_interval_ == 0 ||
00341 params_.max_retry_interval_ == 0)
00342 {
00343 *invalidp = "invalid retry interval";
00344 return -1;
00345 }
00346
00347 if (params_.idle_close_time_ != 0 && type_ == ALWAYSON)
00348 {
00349 *invalidp = "idle_close_time must be zero for always on link";
00350 return -1;
00351 }
00352
00353 return ret;
00354 }
00355
00356
00357 void
00358 Link::set_initial_state()
00359 {
00360 }
00361
00362
00363 Link::~Link()
00364 {
00365 log_debug("destroying link %s", name());
00366
00367 ASSERT(!isopen());
00368 ASSERT(cl_info_ == NULL);
00369 ASSERT(router_info_ == NULL);
00370 }
00371
00372
00373 void
00374 Link::set_state(state_t new_state)
00375 {
00376 log_debug("set_state %s -> %s",
00377 state_to_str(state()),
00378 state_to_str(new_state));
00379
00380 #define ASSERT_STATE(condition) \
00381 if (!(condition)) { \
00382 log_err("set_state %s -> %s: expected %s", \
00383 state_to_str(state()), \
00384 state_to_str(new_state), \
00385 #condition); \
00386 }
00387
00388 switch(new_state) {
00389 case UNAVAILABLE:
00390 break;
00391
00392 case AVAILABLE:
00393 ASSERT_STATE(state_ == OPEN || state_ == UNAVAILABLE);
00394 break;
00395
00396 case OPENING:
00397 ASSERT_STATE(state_ == AVAILABLE || state_ == UNAVAILABLE);
00398 break;
00399
00400 case OPEN:
00401 ASSERT_STATE(state_ == OPENING ||
00402 state_ == UNAVAILABLE );
00403 break;
00404
00405 default:
00406 NOTREACHED;
00407 }
00408 #undef ASSERT_STATE
00409
00410 state_ = new_state;
00411 }
00412
00413
00414 void
00415 Link::open()
00416 {
00417 ASSERT(!isdeleted());
00418
00419 if (state_ != AVAILABLE) {
00420 log_crit("Link::open: in state %s: expected state AVAILABLE",
00421 state_to_str(state()));
00422 return;
00423 }
00424
00425 set_state(OPENING);
00426
00427
00428
00429
00430 ASSERT(contact_ == NULL);
00431 contact_ = new Contact(LinkRef(this, "Link::open"));
00432 clayer()->open_contact(contact_);
00433
00434 stats_.contact_attempts_++;
00435
00436 log_debug("Link::open: *%p new contact %p", this, contact_.object());
00437 }
00438
00439
00440 void
00441 Link::close()
00442 {
00443 log_debug("Link::close");
00444
00445
00446 if (contact_ == NULL) {
00447 log_err("Link::close with no contact");
00448 return;
00449 }
00450
00451
00452
00453 clayer()->close_contact(contact_);
00454 ASSERT(contact_->cl_info() == NULL);
00455
00456
00457
00458 contact_ = NULL;
00459
00460 log_debug("Link::close complete");
00461 }
00462
00463
00464 bool
00465 Link::queue_is_full() const
00466 {
00467 return ((bundles_queued_ > params_.qlimit_bundles_high_) ||
00468 (bytes_queued_ > params_.qlimit_bytes_high_));
00469 }
00470
00471
00472 bool
00473 Link::queue_has_space() const
00474 {
00475 return ((bundles_queued_ < params_.qlimit_bundles_low_) &&
00476 (bytes_queued_ < params_.qlimit_bytes_low_));
00477 }
00478
00479
00480 bool
00481 Link::add_to_queue(const BundleRef& bundle, size_t total_len)
00482 {
00483 oasys::ScopeLock l(&lock_, "Link::add_to_queue");
00484
00485 if (queue_.contains(bundle)) {
00486 log_err("add_to_queue: bundle *%p already in queue for link %s",
00487 bundle.object(), name_.c_str());
00488 return false;
00489 }
00490
00491 log_debug("adding *%p to queue (length %u)",
00492 bundle.object(), bundles_queued_);
00493 bundles_queued_++;
00494 bytes_queued_ += total_len;
00495 queue_.push_back(bundle);
00496
00497 return true;
00498 }
00499
00500
00501 bool
00502 Link::del_from_queue(const BundleRef& bundle, size_t total_len)
00503 {
00504 oasys::ScopeLock l(&lock_, "Link::del_from_queue");
00505
00506 if (! queue_.erase(bundle)) {
00507 return false;
00508 }
00509
00510 ASSERT(bundles_queued_ > 0);
00511 bundles_queued_--;
00512
00513
00514 ASSERT(total_len != 0);
00515 if (bytes_queued_ >= total_len) {
00516 bytes_queued_ -= total_len;
00517
00518 } else {
00519 log_err("del_from_queue: *%p bytes_queued %u < total_len %zu",
00520 bundle.object(), bytes_queued_, total_len);
00521 }
00522
00523 log_debug("removed *%p from queue (length %u)",
00524 bundle.object(), bundles_queued_);
00525 return true;
00526 }
00527
00528 bool
00529 Link::add_to_inflight(const BundleRef& bundle, size_t total_len)
00530 {
00531 oasys::ScopeLock l(&lock_, "Link::add_to_inflight");
00532
00533 if (bundle->is_queued_on(&inflight_)) {
00534 log_err("bundle *%p already in flight for link %s",
00535 bundle.object(), name_.c_str());
00536 return false;
00537 }
00538
00539 log_debug("adding *%p to in flight list for link %s",
00540 bundle.object(), name_.c_str());
00541
00542 inflight_.push_back(bundle.object());
00543
00544 bundles_inflight_++;
00545 bytes_inflight_ += total_len;
00546
00547 return true;
00548 }
00549
00550
00551 bool
00552 Link::del_from_inflight(const BundleRef& bundle, size_t total_len)
00553 {
00554 oasys::ScopeLock l(&lock_, "Link::del_from_inflight");
00555
00556 if (! inflight_.erase(bundle)) {
00557 return false;
00558 }
00559
00560 ASSERT(bundles_inflight_ > 0);
00561 bundles_inflight_--;
00562
00563
00564 ASSERT(total_len != 0);
00565 if (bytes_inflight_ >= total_len) {
00566 bytes_inflight_ -= total_len;
00567
00568 } else {
00569 log_err("del_from_inflight: *%p bytes_inflight %u < total_len %zu",
00570 bundle.object(), bytes_inflight_, total_len);
00571 }
00572
00573 log_debug("removed *%p from inflight list (length %u)",
00574 bundle.object(), bundles_inflight_);
00575 return true;
00576 }
00577
00578
00579 int
00580 Link::format(char* buf, size_t sz) const
00581 {
00582 return snprintf(buf, sz, "%s [%s %s %s %s state=%s]",
00583 name(), nexthop(), remote_eid_.c_str(),
00584 link_type_to_str(type()),
00585 clayer()->name(),
00586 state_to_str(state()));
00587 }
00588
00589
00590 void
00591 Link::dump(oasys::StringBuffer* buf)
00592 {
00593 oasys::ScopeLock l(&lock_, "Link::dump");
00594
00595 if (isdeleted()) {
00596 log_debug("Link::dump: cannot dump deleted link %s", name());
00597 return;
00598 }
00599
00600 buf->appendf("Link %s:\n"
00601 "clayer: %s\n"
00602 "type: %s\n"
00603 "state: %s\n"
00604 "deleted: %s\n"
00605 "nexthop: %s\n"
00606 "remote eid: %s\n"
00607 "mtu: %u\n"
00608 "min_retry_interval: %u\n"
00609 "max_retry_interval: %u\n"
00610 "idle_close_time: %u\n"
00611 "potential_downtime: %u\n"
00612 "prevhop_hdr: %s\n",
00613 name(),
00614 clayer_->name(),
00615 link_type_to_str(type()),
00616 state_to_str(state()),
00617 (deleted_? "true" : "false"),
00618 nexthop(),
00619 remote_eid_.c_str(),
00620 params_.mtu_,
00621 params_.min_retry_interval_,
00622 params_.max_retry_interval_,
00623 params_.idle_close_time_,
00624 params_.potential_downtime_,
00625 params_.prevhop_hdr_ ? "true" : "false");
00626
00627 ASSERT(clayer_ != NULL);
00628 clayer_->dump_link(LinkRef(this, "Link::dump"), buf);
00629 }
00630
00631
00632 void
00633 Link::dump_stats(oasys::StringBuffer* buf)
00634 {
00635 oasys::ScopeLock l(&lock_, "Link::dump_stats");
00636
00637 if (isdeleted()) {
00638 log_debug("Link::dump_stats: "
00639 "cannot dump stats for deleted link %s", name());
00640 return;
00641 }
00642
00643 u_int32_t uptime = stats_.uptime_;
00644 if (contact_ != NULL) {
00645 uptime += (contact_->start_time().elapsed_ms() / 1000);
00646 }
00647
00648 u_int32_t throughput = 0;
00649 if (uptime != 0) {
00650 throughput = (stats_.bytes_transmitted_ * 8) / uptime;
00651 }
00652
00653 buf->appendf("%u contact_attempts -- "
00654 "%u contacts -- "
00655 "%u bundles_transmitted -- "
00656 "%u bytes_transmitted -- "
00657 "%u bundles_queued -- "
00658 "%u bytes_queued -- "
00659 "%u bundles_inflight -- "
00660 "%u bytes_inflight -- "
00661 "%u bundles_cancelled -- "
00662 "%u uptime -- "
00663 "%u throughput_bps",
00664 stats_.contact_attempts_,
00665 stats_.contacts_,
00666 stats_.bundles_transmitted_,
00667 stats_.bytes_transmitted_,
00668 bundles_queued_,
00669 bytes_queued_,
00670 bundles_inflight_,
00671 bytes_inflight_,
00672 stats_.bundles_cancelled_,
00673 uptime,
00674 throughput);
00675
00676 if (router_info_) {
00677 router_info_->dump_stats(buf);
00678 }
00679 }
00680
00681 }