00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "Repository.h"
00018
00019 #define LOG(_level,_args...) core_->print_log("repository", \
00020 BundleCore::_level, _args);
00021
00022 namespace prophet
00023 {
00024
00025 Repository::Repository(BundleCoreRep* core,
00026 QueueComp* qc,
00027 const BundleList* list)
00028 : core_(core), comp_(NULL), current_(0)
00029 {
00030
00031 if (qc == NULL)
00032 {
00033 comp_ = QueuePolicy::policy(QueuePolicy::FIFO);
00034 }
00035 else
00036 {
00037 comp_ = qc;
00038 }
00039
00040 if (list != NULL)
00041 for(const_iterator i = list->begin(); i != list->end(); i++)
00042 add(*i);
00043 }
00044
00045 Repository::~Repository()
00046 {
00047 delete comp_;
00048 }
00049
00050 void
00051 Repository::del(const Bundle* b)
00052 {
00053 LOG(LOG_DEBUG,"del request");
00054 if (b == NULL)
00055 {
00056 LOG(LOG_DEBUG,"NULL bundle");
00057 return;
00058 }
00059
00060
00061 iterator i;
00062 if (find(b,i))
00063 {
00064
00065
00066 b = *i;
00067
00068 current_ -= b->size();
00069
00070
00071 remove_and_reheap(i-list_.begin());
00072
00073 list_.pop_back();
00074 LOG(LOG_DEBUG,"removed %d from list",b->sequence_num());
00075 }
00076 }
00077
00078 bool
00079 Repository::add(const Bundle* b)
00080 {
00081 LOG(LOG_DEBUG, "add request");
00082 if (b == NULL)
00083 {
00084 LOG(LOG_DEBUG,"NULL bundle");
00085 return false;
00086 }
00087
00088
00089 iterator i;
00090 if (find(b,i))
00091 return false;
00092
00093
00094 list_.push_back(b);
00095
00096 size_t last_pos = list_.size() - 1;
00097 push_heap(0,last_pos,0,list_[last_pos]);
00098
00099 current_ += b->size();
00100
00101 if (core_->max_bundle_quota() > 0)
00102 while (core_->max_bundle_quota() < current_)
00103 evict();
00104 return true;
00105 }
00106
00107 void
00108 Repository::set_comparator(QueueComp* qc)
00109 {
00110 if (qc == NULL) return;
00111 LOG(LOG_DEBUG,"changing policy from %s to %s",
00112 QueuePolicy::qp_to_str(comp_->qp()),
00113 QueuePolicy::qp_to_str(qc->qp()));
00114
00115 delete comp_;
00116 comp_ = qc;
00117
00118 if (!list_.empty())
00119 make_heap(0, (list_.size() - 1));
00120 }
00121
00122 void
00123 Repository::handle_change_max()
00124 {
00125
00126 if (core_->max_bundle_quota() > 0)
00127 while (core_->max_bundle_quota() < current_)
00128 evict();
00129 }
00130
00131 void
00132 Repository::change_priority(const Bundle* b)
00133 {
00134 LOG(LOG_DEBUG,"change priority request %d",
00135 b == NULL ? 0 : b->sequence_num());
00136
00137 if (b == NULL)
00138 return;
00139
00140 iterator i = list_.begin();
00141
00142 while (i != list_.end())
00143 if (*(*i) == *b)
00144 break;
00145 else i++;
00146
00147 if (i != list_.end())
00148 {
00149
00150 remove_and_reheap(i - list_.begin());
00151
00152 push_heap(0,list_.size() - 1,0,b);
00153 }
00154 }
00155
00156 void
00157 Repository::evict()
00158 {
00159 if (comp_->qp() != QueuePolicy::LEPR)
00160 {
00161 do_evict:
00162 size_t last_pos = list_.size() - 1;
00163
00164 pop_heap(0, last_pos, last_pos, list_[last_pos]);
00165
00166 const Bundle* b = list_.back();
00167
00168 list_.pop_back();
00169
00170 core_->drop_bundle(b);
00171
00172 current_ -= b->size();
00173
00174 return;
00175 }
00176 else
00177 {
00178
00179
00180
00181 size_t len = list_.size();
00182 for (size_t pos = 0; pos < len; pos++)
00183 {
00184 const Bundle* b = list_[pos];
00185 if (comp_->min_fwd_ < b->num_forward())
00186 {
00187
00188
00189 current_ -= b->size();
00190
00191
00192 remove_and_reheap(pos);
00193
00194 list_.pop_back();
00195 return;
00196 }
00197 }
00198
00199
00200
00201
00202
00203
00204 goto do_evict;
00205 }
00206 }
00207
00208 void
00209 Repository::push_heap(size_t first, size_t hole, size_t top, const Bundle* b)
00210 {
00211 size_t parent = (hole - 1) / 2;
00212 while (hole > top && (*comp_)(list_[first + parent],b))
00213 {
00214 list_[first + hole] = list_[first + parent];
00215 hole = parent;
00216 parent = (hole - 1) / 2;
00217 }
00218 list_[first + hole] = b;
00219 }
00220
00221 void
00222 Repository::pop_heap(size_t first, size_t last, size_t result, const Bundle* b)
00223 {
00224 list_[result] = list_[first];
00225 adjust_heap(first, 0, last - first, b);
00226 }
00227
00228 void
00229 Repository::adjust_heap(size_t first, size_t hole, size_t len, const Bundle* b)
00230 {
00231
00232 if (list_.size() < 2)
00233 return;
00234
00235 const size_t top = hole;
00236 size_t second = 2 * hole + 2;
00237 while (second < len)
00238 {
00239 if ((*comp_)(list_[first + second], list_[first + (second - 1)]))
00240 second--;
00241 list_[first + hole] = list_[first + second];
00242 hole = second;
00243 second = 2 * (second + 1);
00244 }
00245 if (second == len)
00246 {
00247 list_[first + hole] = list_[first + (second - 1)];
00248 hole = second - 1;
00249 }
00250 push_heap(first, hole, top, b);
00251 }
00252
00253 void
00254 Repository::remove_and_reheap(size_t hole)
00255 {
00256
00257 list_[hole] = list_[list_.size() - 1];
00258
00259 adjust_heap(0, hole, list_.size() - 2, list_[hole]);
00260 }
00261
00262 void
00263 Repository::make_heap(size_t first, size_t last)
00264 {
00265
00266 if (last < first + 2) return;
00267
00268 size_t len = last - first;
00269 size_t parent = (len - 2) / 2;
00270 while (true)
00271 {
00272 adjust_heap(first, parent, len, list_[first + parent]);
00273 if (parent == 0) break;
00274 parent--;
00275 }
00276 }
00277
00278 bool
00279 Repository::find(const Bundle* b, iterator& i)
00280 {
00281 if (list_.empty()) return false;
00282
00283 i = list_.begin();
00284 while (i != list_.end())
00285 if (b == *i)
00286 break;
00287 else
00288 i++;
00289 return (i != list_.end() && b == *i);
00290 }
00291
00292 };