/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef _EBPPS_SAMPLE_IMPL_HPP_ #define _EBPPS_SAMPLE_IMPL_HPP_ #include "common_defs.hpp" #include "conditional_forward.hpp" #include "ebpps_sample.hpp" #include "serde.hpp" #include #include #include namespace datasketches { template ebpps_sample::ebpps_sample(uint32_t reserved_size, const A& allocator) : allocator_(allocator), c_(0.0), partial_item_(), data_(allocator) { data_.reserve(reserved_size); } template template ebpps_sample::ebpps_sample(TT&& item, double theta, const A& allocator) : allocator_(allocator), c_(theta), partial_item_(), data_(allocator) { if (theta == 1.0) { data_.reserve(1); data_.emplace_back(std::forward(item)); } else { partial_item_.emplace(std::forward(item)); } } template ebpps_sample::ebpps_sample(std::vector&& data, optional&& partial_item, double c, const A& allocator) : allocator_(allocator), c_(c), partial_item_(partial_item), data_(data, allocator) {} template auto ebpps_sample::get_sample() const -> result_type { double unused; const double c_frac = std::modf(c_, &unused); const bool include_partial = next_double() < c_frac; const uint32_t result_size = static_cast(data_.size()) + (include_partial ? 1 : 0); result_type result; result.reserve(result_size); std::copy(data_.begin(), data_.end(), std::back_inserter(result)); if (include_partial) result.emplace_back(static_cast(*partial_item_)); return result; } template void ebpps_sample::downsample(double theta) { if (theta >= 1.0) return; const double new_c = theta * c_; double new_c_int; const double new_c_frac = std::modf(new_c, &new_c_int); double c_int; const double c_frac = std::modf(c_, &c_int); if (new_c_int == 0.0) { // no full items retained if (next_double() > (c_frac / c_)) { swap_with_partial(); } data_.clear(); } else if (new_c_int == c_int) { // no items deleted if (next_double() > (1 - theta * c_frac)/(1 - new_c_frac)) { swap_with_partial(); } } else { if (next_double() < theta * c_frac) { // subsample data in random order; last item is partial // create sample size new_c_int then swap_with_partial() subsample(static_cast(new_c_int)); swap_with_partial(); } else { // create sample size new_c_int + 1 then move_one_to_partial) subsample(static_cast(new_c_int) + 1); move_one_to_partial(); } } if (new_c == new_c_int) partial_item_.reset(); c_ = new_c; } template template void ebpps_sample::merge(FwdSample&& other) { double c_int; const double c_frac = std::modf(c_, &c_int); double unused; const double other_c_frac = std::modf(other.c_, &unused); // update c_ here but do NOT recompute fractional part yet c_ += other.c_; for (uint32_t i = 0; i < other.data_.size(); ++i) data_.emplace_back(conditional_forward(other.data_[i])); // This modifies the original algorithm slightly due to numeric // precision issues. Specifically, the test if c_frac + other_c_frac == 1.0 // happens before tests for < 1.0 or > 1.0 and can also be triggered // if c_ == floor(c_) (the updated value of c_, not the input). // // We can still run into issues where c_frac + other_c_frac == epsilon // and the first case would have ideally triggered. As a result, we must // check if the partial item exists before adding to the data_ vector. if (c_frac == 0.0 && other_c_frac == 0.0) { partial_item_.reset(); } else if (c_frac + other_c_frac == 1.0 || c_ == std::floor(c_)) { if (next_double() <= c_frac) { if (partial_item_) data_.emplace_back(std::move(*partial_item_)); } else { if (other.partial_item_) data_.emplace_back(conditional_forward(*other.partial_item_)); } partial_item_.reset(); } else if (c_frac + other_c_frac < 1.0) { if (next_double() > c_frac / (c_frac + other_c_frac)) { set_partial(conditional_forward(*other.partial_item_)); } } else { // c_frac + other_c_frac > 1 if (next_double() <= (1 - c_frac) / ((1 - c_frac) + (1 - other_c_frac))) { data_.emplace_back(conditional_forward(*other.partial_item_)); } else { data_.emplace_back(std::move(*partial_item_)); partial_item_.reset(); set_partial(conditional_forward(*other.partial_item_)); } } } template string ebpps_sample::to_string() const { std::ostringstream oss; oss << " sample:" << std::endl; uint32_t idx = 0; for (const T& item : data_) oss << "\t" << idx++ << ":\t" << item << std::endl; oss << " partial: " << (bool(partial_item_) ? (*partial_item_) : "NULL") << std::endl; return oss.str(); } template void ebpps_sample::subsample(uint32_t num_samples) { // we can perform a Fisher-Yates style shuffle, stopping after // num_samples points since subsequent swaps would only be // between items after num_samples. This is valid since a // point from anywhere in the initial array would be eligible // to end up in the final subsample. if (num_samples == data_.size()) return; auto erase_start = data_.begin(); uint32_t data_len = static_cast(data_.size()); for (uint32_t i = 0; i < num_samples; ++i, ++erase_start) { uint32_t j = i + random_idx(data_len - i); std::swap(data_[i], data_[j]); } data_.erase(erase_start, data_.end()); } template template void ebpps_sample::set_partial(FwdItem&& item) { if (partial_item_) *partial_item_ = conditional_forward(item); else partial_item_.emplace(conditional_forward(item)); } template void ebpps_sample::move_one_to_partial() { const size_t idx = random_idx(static_cast(data_.size())); // swap selected item to end so we can delete it easily const size_t last_idx = data_.size() - 1; if (idx != last_idx) { std::swap(data_[idx], data_[last_idx]); } set_partial(std::move(data_[last_idx])); data_.pop_back(); } template void ebpps_sample::swap_with_partial() { if (partial_item_) { const size_t idx = random_idx(static_cast(data_.size())); std::swap(data_[idx], *partial_item_); } else { move_one_to_partial(); } } template void ebpps_sample::reset() { c_ = 0.0; partial_item_.reset(); data_.clear(); } template double ebpps_sample::get_c() const { return c_; } template auto ebpps_sample::get_full_items() const -> result_type { return result_type(data_); } template bool ebpps_sample::has_partial_item() const { return bool(partial_item_); } template T ebpps_sample::get_partial_item() const { if (!partial_item_) throw std::runtime_error("Call to get_partial_item() when no partial item exists"); return *partial_item_; } template uint32_t ebpps_sample::random_idx(uint32_t max) { static std::uniform_int_distribution dist; return dist(random_utils::rand, std::uniform_int_distribution::param_type(0, max - 1)); } template double ebpps_sample::next_double() { return random_utils::next_double(random_utils::rand); } template uint32_t ebpps_sample::get_num_retained_items() const { return static_cast(data_.size() + (partial_item_ ? 1 : 0)); } // implementation for fixed-size arithmetic types (integral and floating point) template template::value, int>::type> size_t ebpps_sample::get_serialized_size_bytes(const SerDe&) const { if (c_ == 0.0) return 0; else return sizeof(c_) + (data_.size() + (partial_item_ ? 1 : 0)) * sizeof(T); } // implementation for all other types template template::value, int>::type> size_t ebpps_sample::get_serialized_size_bytes(const SerDe& sd) const { if (c_ == 0.0) return 0; size_t num_bytes = sizeof(c_); for (auto it : data_) num_bytes += sd.size_of_item(it); if (partial_item_) num_bytes += sd.size_of_item(*partial_item_); return num_bytes; } template template size_t ebpps_sample::serialize(uint8_t* ptr, const uint8_t* end_ptr, const SerDe& sd) const { uint8_t* st_ptr = ptr; ensure_minimum_memory(end_ptr - ptr, sizeof(c_)); ptr += copy_to_mem(c_, ptr); ptr += sd.serialize(ptr, end_ptr - ptr, data_.data(), static_cast(data_.size())); if (partial_item_) { ptr += sd.serialize(ptr, end_ptr - ptr, &*partial_item_, 1); } return ptr - st_ptr; } template template void ebpps_sample::serialize(std::ostream& os, const SerDe& sd) const { write(os, c_); sd.serialize(os, data_.data(), static_cast(data_.size())); if (partial_item_) sd.serialize(os, &*partial_item_, 1); if (!os.good()) throw std::runtime_error("error writing to std::ostream"); } template template std::pair, size_t> ebpps_sample::deserialize(const uint8_t* ptr, size_t size, const SerDe& sd, const A& allocator) { const uint8_t* st_ptr = ptr; const uint8_t* end_ptr = ptr + size; ensure_minimum_memory(size, sizeof(double)); double c; ptr += copy_from_mem(ptr, c); if (c < 0.0) throw std::runtime_error("sketch image has C < 0.0 during deserializaiton"); double c_int; const double c_frac = std::modf(c, &c_int); const bool has_partial = c_frac != 0.0; const uint32_t num_full_items = static_cast(c_int); A alloc(allocator); std::unique_ptr items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items)); ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), num_full_items); // serde did not throw, enable destructors items.get_deleter().set_destroy(true); std::vector data(std::make_move_iterator(items.get()), std::make_move_iterator(items.get() + num_full_items), allocator); optional partial_item; if (has_partial) { optional tmp; // space to deserialize ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1); // serde did not throw so place item and clean up partial_item.emplace(*tmp); (*tmp).~T(); } return std::pair, size_t>( ebpps_sample(std::move(data), std::move(partial_item), c, allocator), ptr - st_ptr); } template template ebpps_sample ebpps_sample::deserialize(std::istream& is, const SerDe& sd, const A& allocator) { const double c = read(is); if (c < 0.0) throw std::runtime_error("sketch image has C < 0.0 during deserializaiton"); double c_int; const double c_frac = std::modf(c, &c_int); const bool has_partial = c_frac != 0.0; const uint32_t num_full_items = static_cast(c_int); A alloc(allocator); std::unique_ptr items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items)); sd.deserialize(is, items.get(), num_full_items); // serde did not throw, enable destructors items.get_deleter().set_destroy(true); std::vector data(std::make_move_iterator(items.get()), std::make_move_iterator(items.get() + num_full_items), allocator); optional partial_item; if (has_partial) { optional tmp; // space to deserialize sd.deserialize(is, &*tmp, 1); // serde did not throw so place item and clean up partial_item.emplace(*tmp); (*tmp).~T(); } if (!is.good()) throw std::runtime_error("error reading from std::istream"); return ebpps_sample(std::move(data), std::move(partial_item), c, allocator); } template typename ebpps_sample::const_iterator ebpps_sample::begin() const { return const_iterator(this); } template typename ebpps_sample::const_iterator ebpps_sample::end() const { return const_iterator(nullptr); } // -------- ebpps_sketch::const_iterator implementation --------- template ebpps_sample::const_iterator::const_iterator(const ebpps_sample* sample) : sample_(sample), idx_(0), use_partial_(false) { if (sample == nullptr) return; // determine in advance if we use the partial item double c_int; const double c_frac = std::modf(sample_->get_c(), &c_int); use_partial_ = sample->next_double() < c_frac; // sample with no items if (sample_->data_.size() == 0 && use_partial_) { idx_ = PARTIAL_IDX; } if (sample_->c_== 0.0 || (sample_->data_.size() == 0 && !sample_->has_partial_item())) { sample_ = nullptr; } } template ebpps_sample::const_iterator::const_iterator(const const_iterator& other) : sample_(other.sample_), idx_(other.idx_), use_partial_(other.use_partial_) {} template typename ebpps_sample::const_iterator& ebpps_sample::const_iterator::operator++() { if (sample_ == nullptr) return *this; else if (idx_ == PARTIAL_IDX) { idx_ = sample_->data_.size(); sample_ = nullptr; return * this; } ++idx_; if (idx_ == sample_->data_.size()) { if (use_partial_) idx_ = PARTIAL_IDX; else sample_ = nullptr; } return *this; } template typename ebpps_sample::const_iterator& ebpps_sample::const_iterator::operator++(int) { const_iterator tmp(*this); operator++(); return tmp; } template bool ebpps_sample::const_iterator::operator==(const const_iterator& other) const { if (sample_ != other.sample_) return false; if (sample_ == nullptr) return true; // end (and we know other.sample_ is also null) return idx_ == other.idx_; } template bool ebpps_sample::const_iterator::operator!=(const const_iterator& other) const { return !operator==(other); } template auto ebpps_sample::const_iterator::operator*() const -> reference { if (idx_ == PARTIAL_IDX) return *(sample_->partial_item_); else return sample_->data_[idx_]; } template auto ebpps_sample::const_iterator::operator->() const -> pointer { return **this; } template class ebpps_sample::items_deleter { public: items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {} void operator() (T* ptr) { if (ptr != nullptr) { if (destroy_) { for (size_t i = 0; i < num_; ++i) { ptr[i].~T(); } } allocator_.deallocate(ptr, num_); } } void set_destroy(bool destroy) { destroy_ = destroy; } private: A allocator_; bool destroy_; size_t num_; }; } // namespace datasketches #endif // _EBPPS_SAMPLE_IMPL_HPP_