/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include "binomial_bounds.hpp" #include "theta_helpers.hpp" namespace datasketches { template bool tuple_sketch::is_estimation_mode() const { return get_theta64() < theta_constants::MAX_THETA && !is_empty(); } template double tuple_sketch::get_theta() const { return static_cast(get_theta64()) / static_cast(theta_constants::MAX_THETA); } template double tuple_sketch::get_estimate() const { return get_num_retained() / get_theta(); } template double tuple_sketch::get_lower_bound(uint8_t num_std_devs, uint32_t num_subset_entries) const { num_subset_entries = std::min(num_subset_entries, get_num_retained()) ; if (!is_estimation_mode()) return num_subset_entries; return binomial_bounds::get_lower_bound(num_subset_entries, get_theta(), num_std_devs); } template double tuple_sketch::get_lower_bound(uint8_t num_std_devs) const { return get_lower_bound(num_std_devs, get_num_retained()) ; } template double tuple_sketch::get_upper_bound(uint8_t num_std_devs, uint32_t num_subset_entries) const { num_subset_entries = std::min(num_subset_entries, get_num_retained()) ; if (!is_estimation_mode()) return num_subset_entries; return binomial_bounds::get_upper_bound(num_subset_entries, get_theta(), num_std_devs); } template double tuple_sketch::get_upper_bound(uint8_t num_std_devs) const { return get_upper_bound(num_std_devs, get_num_retained()) ; } template string tuple_sketch::to_string(bool detail) const { // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements. // The stream does not support passing an allocator instance, and alternatives are complicated. std::ostringstream os; os << "### Tuple sketch summary:" << std::endl; os << " num retained entries : " << get_num_retained() << std::endl; os << " seed hash : " << get_seed_hash() << std::endl; os << " empty? : " << (is_empty() ? "true" : "false") << std::endl; os << " ordered? : " << (is_ordered() ? "true" : "false") << std::endl; os << " estimation mode? : " << (is_estimation_mode() ? "true" : "false") << std::endl; os << " theta (fraction) : " << get_theta() << std::endl; os << " theta (raw 64-bit) : " << get_theta64() << std::endl; os << " estimate : " << this->get_estimate() << std::endl; os << " lower bound 95% conf : " << this->get_lower_bound(2) << std::endl; os << " upper bound 95% conf : " << this->get_upper_bound(2) << std::endl; print_specifics(os); os << "### End sketch summary" << std::endl; if (detail) { os << "### Retained entries" << std::endl; for (const auto& it: *this) { os << it.first << ": " << it.second << std::endl; } os << "### End retained entries" << std::endl; } return string(os.str().c_str(), get_allocator()); } // update sketch template update_tuple_sketch::update_tuple_sketch(uint8_t lg_cur_size, uint8_t lg_nom_size, resize_factor rf, float p, uint64_t theta, uint64_t seed, const P& policy, const A& allocator): policy_(policy), map_(lg_cur_size, lg_nom_size, rf, p, theta, seed, allocator) {} template A update_tuple_sketch::get_allocator() const { return map_.allocator_; } template bool update_tuple_sketch::is_empty() const { return map_.is_empty_; } template bool update_tuple_sketch::is_ordered() const { return map_.num_entries_ > 1 ? false : true;; } template uint64_t update_tuple_sketch::get_theta64() const { return is_empty() ? theta_constants::MAX_THETA : map_.theta_; } template uint32_t update_tuple_sketch::get_num_retained() const { return map_.num_entries_; } template uint16_t update_tuple_sketch::get_seed_hash() const { return compute_seed_hash(map_.seed_); } template uint8_t update_tuple_sketch::get_lg_k() const { return map_.lg_nom_size_; } template auto update_tuple_sketch::get_rf() const -> resize_factor { return map_.rf_; } template template void update_tuple_sketch::update(uint64_t key, UU&& value) { update(&key, sizeof(key), std::forward(value)); } template template void update_tuple_sketch::update(int64_t key, UU&& value) { update(&key, sizeof(key), std::forward(value)); } template template void update_tuple_sketch::update(uint32_t key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(int32_t key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(uint16_t key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(int16_t key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(uint8_t key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(int8_t key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(const std::string& key, UU&& value) { if (key.empty()) return; update(key.c_str(), key.length(), std::forward(value)); } template template void update_tuple_sketch::update(double key, UU&& value) { update(canonical_double(key), std::forward(value)); } template template void update_tuple_sketch::update(float key, UU&& value) { update(static_cast(key), std::forward(value)); } template template void update_tuple_sketch::update(const void* key, size_t length, UU&& value) { const uint64_t hash = map_.hash_and_screen(key, length); if (hash == 0) return; auto result = map_.find(hash); if (!result.second) { S summary = policy_.create(); policy_.update(summary, std::forward(value)); map_.insert(result.first, Entry(hash, std::move(summary))); } else { policy_.update((*result.first).second, std::forward(value)); } } template void update_tuple_sketch::trim() { map_.trim(); } template void update_tuple_sketch::reset() { map_.reset(); } template auto update_tuple_sketch::begin() -> iterator { return iterator(map_.entries_, 1 << map_.lg_cur_size_, 0); } template auto update_tuple_sketch::end() -> iterator { return iterator(nullptr, 0, 1 << map_.lg_cur_size_); } template auto update_tuple_sketch::begin() const -> const_iterator { return const_iterator(map_.entries_, 1 << map_.lg_cur_size_, 0); } template auto update_tuple_sketch::end() const -> const_iterator { return const_iterator(nullptr, 0, 1 << map_.lg_cur_size_); } template compact_tuple_sketch update_tuple_sketch::compact(bool ordered) const { return compact_tuple_sketch(*this, ordered); } template void update_tuple_sketch::print_specifics(std::ostringstream& os) const { os << " lg nominal size : " << (int) map_.lg_nom_size_ << std::endl; os << " lg current size : " << (int) map_.lg_cur_size_ << std::endl; os << " resize factor : " << (1 << map_.rf_) << std::endl; } // compact sketch template compact_tuple_sketch::compact_tuple_sketch(bool is_empty, bool is_ordered, uint16_t seed_hash, uint64_t theta, std::vector&& entries): is_empty_(is_empty), is_ordered_(is_ordered || (entries.size() <= 1ULL)), seed_hash_(seed_hash), theta_(theta), entries_(std::move(entries)) {} template compact_tuple_sketch::compact_tuple_sketch(const Base& other, bool ordered): is_empty_(other.is_empty()), is_ordered_(other.is_ordered() || ordered), seed_hash_(other.get_seed_hash()), theta_(other.get_theta64()), entries_(other.get_allocator()) { entries_.reserve(other.get_num_retained()); std::copy(other.begin(), other.end(), std::back_inserter(entries_)); if (ordered && !other.is_ordered()) std::sort(entries_.begin(), entries_.end(), comparator()); } template compact_tuple_sketch::compact_tuple_sketch(compact_tuple_sketch&& other) noexcept: is_empty_(other.is_empty()), is_ordered_(other.is_ordered()), seed_hash_(other.get_seed_hash()), theta_(other.get_theta64()), entries_(std::move(other.entries_)) {} template compact_tuple_sketch::compact_tuple_sketch(const theta_sketch_alloc& other, const S& summary, bool ordered): is_empty_(other.is_empty()), is_ordered_(other.is_ordered() || ordered), seed_hash_(other.get_seed_hash()), theta_(other.get_theta64()), entries_(other.get_allocator()) { entries_.reserve(other.get_num_retained()); for (uint64_t hash: other) { entries_.push_back(Entry(hash, summary)); } if (ordered && !other.is_ordered()) std::sort(entries_.begin(), entries_.end(), comparator()); } template A compact_tuple_sketch::get_allocator() const { return entries_.get_allocator(); } template bool compact_tuple_sketch::is_empty() const { return is_empty_; } template bool compact_tuple_sketch::is_ordered() const { return is_ordered_; } template uint64_t compact_tuple_sketch::get_theta64() const { return theta_; } template uint32_t compact_tuple_sketch::get_num_retained() const { return static_cast(entries_.size()); } template uint16_t compact_tuple_sketch::get_seed_hash() const { return seed_hash_; } // implementation for fixed-size arithmetic types (integral and floating point) template template::value, int>::type> size_t compact_tuple_sketch::get_serialized_size_summaries_bytes(const SD& sd) const { unused(sd); return entries_.size() * sizeof(SS); } // implementation for all other types (non-arithmetic) template template::value, int>::type> size_t compact_tuple_sketch::get_serialized_size_summaries_bytes(const SD& sd) const { size_t size = 0; for (const auto& it: entries_) { size += sd.size_of_item(it.second); } return size; } template template void compact_tuple_sketch::serialize(std::ostream& os, const SerDe& sd) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; write(os, preamble_longs); const uint8_t serial_version = SERIAL_VERSION; write(os, serial_version); const uint8_t family = SKETCH_FAMILY; write(os, family); const uint8_t type = SKETCH_TYPE; write(os, type); const uint8_t unused8 = 0; write(os, unused8); const uint8_t flags_byte( (1 << flags::IS_COMPACT) | (1 << flags::IS_READ_ONLY) | (this->is_empty() ? 1 << flags::IS_EMPTY : 0) | (this->is_ordered() ? 1 << flags::IS_ORDERED : 0) ); write(os, flags_byte); const uint16_t seed_hash = get_seed_hash(); write(os, seed_hash); if (preamble_longs > 1) { const uint32_t num_entries = static_cast(entries_.size()); write(os, num_entries); const uint32_t unused32 = 0; write(os, unused32); } if (this->is_estimation_mode()) { write(os, this->theta_); } for (const auto& it: entries_) { write(os, it.first); sd.serialize(os, &it.second, 1); } } template template auto compact_tuple_sketch::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes { const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + sizeof(uint64_t) * entries_.size() + get_serialized_size_summaries_bytes(sd); vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; const uint8_t* end_ptr = ptr + size; ptr += copy_to_mem(preamble_longs, ptr); const uint8_t serial_version = SERIAL_VERSION; ptr += copy_to_mem(serial_version, ptr); const uint8_t family = SKETCH_FAMILY; ptr += copy_to_mem(family, ptr); const uint8_t type = SKETCH_TYPE; ptr += copy_to_mem(type, ptr); ptr += sizeof(uint8_t); // unused const uint8_t flags_byte( (1 << flags::IS_COMPACT) | (1 << flags::IS_READ_ONLY) | (this->is_empty() ? 1 << flags::IS_EMPTY : 0) | (this->is_ordered() ? 1 << flags::IS_ORDERED : 0) ); ptr += copy_to_mem(flags_byte, ptr); const uint16_t seed_hash = get_seed_hash(); ptr += copy_to_mem(seed_hash, ptr); if (preamble_longs > 1) { const uint32_t num_entries = static_cast(entries_.size()); ptr += copy_to_mem(num_entries, ptr); ptr += sizeof(uint32_t); // unused } if (this->is_estimation_mode()) { ptr += copy_to_mem(theta_, ptr); } for (const auto& it: entries_) { ptr += copy_to_mem(it.first, ptr); ptr += sd.serialize(ptr, end_ptr - ptr, &it.second, 1); } return bytes; } template template compact_tuple_sketch compact_tuple_sketch::deserialize(std::istream& is, uint64_t seed, const SerDe& sd, const A& allocator) { const auto preamble_longs = read(is); const auto serial_version = read(is); const auto family = read(is); const auto type = read(is); read(is); // unused const auto flags_byte = read(is); const auto seed_hash = read(is); if (serial_version != SERIAL_VERSION && serial_version != SERIAL_VERSION_LEGACY) { throw std::invalid_argument("serial version mismatch: expected " + std::to_string(SERIAL_VERSION) + " or " + std::to_string(SERIAL_VERSION_LEGACY) + ", actual " + std::to_string(serial_version)); } checker::check_sketch_family(family, SKETCH_FAMILY); if (type != SKETCH_TYPE && type != SKETCH_TYPE_LEGACY) { throw std::invalid_argument("sketch type mismatch: expected " + std::to_string(SKETCH_TYPE) + " or " + std::to_string(SKETCH_TYPE_LEGACY) + ", actual " + std::to_string(type)); } const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); if (!is_empty) checker::check_seed_hash(seed_hash, compute_seed_hash(seed)); uint64_t theta = theta_constants::MAX_THETA; uint32_t num_entries = 0; if (!is_empty) { if (preamble_longs == 1) { num_entries = 1; } else { num_entries = read(is); read(is); // unused if (preamble_longs > 2) { theta = read(is); } } } A alloc(allocator); std::vector entries(alloc); if (!is_empty) { entries.reserve(num_entries); std::unique_ptr summary(alloc.allocate(1), deleter_of_summaries(1, false, allocator)); for (size_t i = 0; i < num_entries; ++i) { const auto key = read(is); sd.deserialize(is, summary.get(), 1); entries.push_back(Entry(key, std::move(*summary))); (*summary).~S(); } } if (!is.good()) throw std::runtime_error("error reading from std::istream"); const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED); return compact_tuple_sketch(is_empty, is_ordered, seed_hash, theta, std::move(entries)); } template template compact_tuple_sketch compact_tuple_sketch::deserialize(const void* bytes, size_t size, uint64_t seed, const SerDe& sd, const A& allocator) { ensure_minimum_memory(size, 8); const char* ptr = static_cast(bytes); const char* base = ptr; uint8_t preamble_longs; ptr += copy_from_mem(ptr, preamble_longs); uint8_t serial_version; ptr += copy_from_mem(ptr, serial_version); uint8_t family; ptr += copy_from_mem(ptr, family); uint8_t type; ptr += copy_from_mem(ptr, type); ptr += sizeof(uint8_t); // unused uint8_t flags_byte; ptr += copy_from_mem(ptr, flags_byte); uint16_t seed_hash; ptr += copy_from_mem(ptr, seed_hash); if (serial_version != SERIAL_VERSION && serial_version != SERIAL_VERSION_LEGACY) { throw std::invalid_argument("serial version mismatch: expected " + std::to_string(SERIAL_VERSION) + " or " + std::to_string(SERIAL_VERSION_LEGACY) + ", actual " + std::to_string(serial_version)); } checker::check_sketch_family(family, SKETCH_FAMILY); if (type != SKETCH_TYPE && type != SKETCH_TYPE_LEGACY) { throw std::invalid_argument("sketch type mismatch: expected " + std::to_string(SKETCH_TYPE) + " or " + std::to_string(SKETCH_TYPE_LEGACY) + ", actual " + std::to_string(type)); } const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); if (!is_empty) checker::check_seed_hash(seed_hash, compute_seed_hash(seed)); uint64_t theta = theta_constants::MAX_THETA; uint32_t num_entries = 0; if (!is_empty) { if (preamble_longs == 1) { num_entries = 1; } else { ensure_minimum_memory(size, 8); // read the first prelong before this method ptr += copy_from_mem(ptr, num_entries); ptr += sizeof(uint32_t); // unused if (preamble_longs > 2) { ensure_minimum_memory(size, (preamble_longs - 1) << 3); ptr += copy_from_mem(ptr, theta); } } } const size_t keys_size_bytes = sizeof(uint64_t) * num_entries; ensure_minimum_memory(size, ptr - base + keys_size_bytes); A alloc(allocator); std::vector entries(alloc); if (!is_empty) { entries.reserve(num_entries); std::unique_ptr summary(alloc.allocate(1), deleter_of_summaries(1, false, allocator)); for (size_t i = 0; i < num_entries; ++i) { uint64_t key; ptr += copy_from_mem(ptr, key); ptr += sd.deserialize(ptr, base + size - ptr, summary.get(), 1); entries.push_back(Entry(key, std::move(*summary))); (*summary).~S(); } } const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED); return compact_tuple_sketch(is_empty, is_ordered, seed_hash, theta, std::move(entries)); } template auto compact_tuple_sketch::begin() -> iterator { return iterator(entries_.data(), static_cast(entries_.size()), 0); } template auto compact_tuple_sketch::end() -> iterator { return iterator(nullptr, 0, static_cast(entries_.size())); } template auto compact_tuple_sketch::begin() const -> const_iterator { return const_iterator(entries_.data(), static_cast(entries_.size()), 0); } template auto compact_tuple_sketch::end() const -> const_iterator { return const_iterator(nullptr, 0, static_cast(entries_.size())); } template void compact_tuple_sketch::print_specifics(std::ostringstream&) const {} // builder template tuple_base_builder::tuple_base_builder(const P& policy, const A& allocator): theta_base_builder(allocator), policy_(policy) {} template update_tuple_sketch::builder::builder(const P& policy, const A& allocator): tuple_base_builder(policy, allocator) {} template auto update_tuple_sketch::builder::build() const -> update_tuple_sketch { return update_tuple_sketch(this->starting_lg_size(), this->lg_k_, this->rf_, this->p_, this->starting_theta(), this->seed_, this->policy_, this->allocator_); } } /* namespace datasketches */