/* * Copyright 2018, Oath Inc. Licensed under the terms of the * Apache License 2.0. See LICENSE file at the project root for terms. */ #ifndef KLL_SKETCH_HPP_ #define KLL_SKETCH_HPP_ #include #include #include #include #include #include #include "kll_quantile_calculator.hpp" #include "kll_helper.hpp" namespace datasketches { /* * Implementation of a very compact quantiles sketch with lazy compaction scheme * and nearly optimal accuracy per retained item. * See Optimal Quantile Approximation in Streams. * *

This is a stochastic streaming sketch that enables near-real time analysis of the * approximate distribution of values from a very large stream in a single pass, requiring only * that the values are comparable. * The analysis is obtained using get_quantile() or get_quantiles() functions or the * inverse functions get_rank(), get_PMF() (Probability Mass Function), and get_CDF() * (Cumulative Distribution Function). * *

Given an input stream of N numeric values, the absolute rank of any specific * value is defined as its index (0 to N-1) in the hypothetical sorted stream of all * N input values. * *

The normalized rank (rank) of any specific value is defined as its * absolute rank divided by N. * Thus, the normalized rank is a value between zero and one. * In the documentation for this sketch absolute rank is never used so any * reference to just rank should be interpreted to mean normalized rank. * *

This sketch is configured with a parameter k, which affects the size of the sketch * and its estimation error. * *

The estimation error is commonly called epsilon (or eps) and is a fraction * between zero and one. Larger values of k result in smaller values of epsilon. * Epsilon is always with respect to the rank and cannot be applied to the * corresponding values. * *

The relationship between the normalized rank and the corresponding values can be viewed * as a two dimensional monotonic plot with the normalized rank on one axis and the * corresponding values on the other axis. If the y-axis is specified as the value-axis and * the x-axis as the normalized rank, then y = get_quantile(x) is a monotonically * increasing function. * *

The functions get_quantile(rank) and get_quantiles(...) translate ranks into * corresponding values. The functions get_rank(value), * get_CDF(...) (Cumulative Distribution Function), and get_PMF(...) * (Probability Mass Function) perform the opposite operation and translate values into ranks. * *

The getPMF(...) function has about 13 to 47% worse rank error (depending * on k) than the other queries because the mass of each "bin" of the PMF has * "double-sided" error from the upper and lower edges of the bin as a result of a subtraction, * as the errors from the two edges can sometimes add. * *

The default k of 200 yields a "single-sided" epsilon of about 1.33% and a * "double-sided" (PMF) epsilon of about 1.65%. * *

A get_quantile(rank) query has the following guarantees: *

    *
  • Let v = get_quantile(r) where r is the rank between zero and one.
  • *
  • The value v will be a value from the input stream.
  • *
  • Let trueRank be the true rank of v derived from the hypothetical sorted * stream of all N values.
  • *
  • Let eps = get_normalized_rank_error(false).
  • *
  • Then r - eps ≤ trueRank ≤ r + eps with a confidence of 99%. Note that the * error is on the rank, not the value.
  • *
* *

A get_rank(value) query has the following guarantees: *

    *
  • Let r = get_rank(v) where v is a value between the min and max values of * the input stream.
  • *
  • Let true_rank be the true rank of v derived from the hypothetical sorted * stream of all N values.
  • *
  • Let eps = get_normalized_rank_error(false).
  • *
  • Then r - eps ≤ trueRank ≤ r + eps with a confidence of 99%.
  • *
* *

A get_PMF() query has the following guarantees: *

    *
  • Let {r1, r2, ..., r(m+1)} = get_PMF(v1, v2, ..., vm) where v1, v2 are values * between the min and max values of the input stream. *
  • Let massi = estimated mass between vi and vi+1.
  • *
  • Let trueMass be the true mass between the values of vi, * vi+1 derived from the hypothetical sorted stream of all N values.
  • *
  • Let eps = get_normalized_rank_error(true).
  • *
  • then mass - eps ≤ trueMass ≤ mass + eps with a confidence of 99%.
  • *
  • r(m+1) includes the mass of all points larger than vm.
  • *
* *

A get_CDF(...) query has the following guarantees; *

    *
  • Let {r1, r2, ..., r(m+1)} = get_CDF(v1, v2, ..., vm) where v1, v2 are values * between the min and max values of the input stream. *
  • Let massi = ri+1 - ri.
  • *
  • Let trueMass be the true mass between the true ranks of vi, * vi+1 derived from the hypothetical sorted stream of all N values.
  • *
  • Let eps = get_normalized_rank_error(true).
  • *
  • then mass - eps ≤ trueMass ≤ mass + eps with a confidence of 99%.
  • *
  • 1 - r(m+1) includes the mass of all points larger than vm.
  • *
* *

From the above, it might seem like we could make some estimates to bound the * value returned from a call to get_quantile(). The sketch, however, does not * let us derive error bounds or confidences around values. Because errors are independent, we * can approximately bracket a value as shown below, but there are no error estimates available. * Additionally, the interval may be quite large for certain distributions. *

    *
  • Let v = get_quantile(r), the estimated quantile value of rank r.
  • *
  • Let eps = get_normalized_rank_error(false).
  • *
  • Let vlo = estimated quantile value of rank (r - eps).
  • *
  • Let vhi = estimated quantile value of rank (r + eps).
  • *
  • Then vlo ≤ v ≤ vhi, with 99% confidence.
  • *
* * author Kevin Lang * author Alexander Saydakov * author Lee Rhodes */ // this should work for fixed size types, but needs to be specialized for other types template void serialize_items(std::ostream& os, const T* items, unsigned num) { os.write((char*)items, sizeof(T) * num); } // this should work for fixed size types, but needs to be specialized for other types template void deserialize_items(std::istream& is, T* items, unsigned num) { is.read((char*)items, sizeof(T) * num); } // this should work for fixed size types, but needs to be specialized for other types template size_t serialize_items(char* ptr, const T* items, unsigned num) { memcpy(ptr, items, sizeof(T) * num); return sizeof(T) * num; } // this should work for fixed size types, but needs to be specialized for other types template size_t deserialize_items(const char* ptr, T* items, unsigned num) { memcpy(items, ptr, sizeof(T) * num); return sizeof(T) * num; } typedef std::unique_ptr> ptr_with_deleter; template > class kll_sketch { typedef typename A::template rebind::other AllocT; typedef typename A::template rebind::other AllocU32; public: static const uint8_t DEFAULT_M = 8; static const uint16_t DEFAULT_K = 200; static const uint16_t MIN_K = DEFAULT_M; static const uint16_t MAX_K = (1 << 16) - 1; // serialized as an uint16_t explicit kll_sketch(uint16_t k = DEFAULT_K) : alloc_t(AllocT()), alloc_u32(AllocU32()) { if (k < MIN_K or k > MAX_K) { throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k)); } k_ = k; m_ = DEFAULT_M; min_k_ = k; n_ = 0; num_levels_ = 1; levels_size_ = 2; levels_ = new (alloc_u32.allocate(2)) uint32_t[2] {k_, k_}; items_size_ = k_; items_ = alloc_t.allocate(items_size_); for (unsigned i = 0; i < items_size_; i++) alloc_t.construct(&items_[i], T()); if (std::is_floating_point::value) { min_value_ = std::numeric_limits::quiet_NaN(); max_value_ = std::numeric_limits::quiet_NaN(); } is_level_zero_sorted_ = false; } kll_sketch(const kll_sketch& other) : alloc_t(AllocT()), alloc_u32(AllocU32()) { k_ = other.k_; m_ = other.m_; min_k_ = other.min_k_; n_ = other.n_; num_levels_ = other.num_levels_; levels_size_ = other.levels_size_; levels_ = alloc_u32.allocate(levels_size_); std::copy(&other.levels_[0], &other.levels_[levels_size_], levels_); items_size_ = other.items_size_; items_ = alloc_t.allocate(items_size_); for (unsigned i = 0; i < items_size_; i++) alloc_t.construct(&items_[i], other.items_[i]); min_value_ = other.min_value_; max_value_ = other.max_value_; is_level_zero_sorted_ = other.is_level_zero_sorted_; } kll_sketch& operator=(kll_sketch other) { std::swap(k_, other.k_); std::swap(m_, other.m_); std::swap(min_k_, other.min_k_); std::swap(n_, other.n_); std::swap(num_levels_, other.num_levels_); std::swap(levels_size_, other.levels_size_); std::swap(levels_, other.levels_); std::swap(items_size_, other.items_size_); std::swap(items_, other.items_); std::swap(min_value_, other.min_value_); std::swap(max_value_, other.max_value_); std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_); return *this; } ~kll_sketch() { alloc_u32.deallocate(levels_, levels_size_); for (unsigned i = 0; i < items_size_; i++) alloc_t.destroy(&items_[i]); alloc_t.deallocate(items_, items_size_); } void update(const T& value) { if (is_empty()) { min_value_ = value; max_value_ = value; } else { if (value < min_value_) min_value_ = value; if (max_value_ < value) max_value_ = value; } if (levels_[0] == 0) compress_while_updating(); n_++; is_level_zero_sorted_ = false; const uint32_t next_pos(levels_[0] - 1); levels_[0] = next_pos; items_[next_pos] = value; } void merge(const kll_sketch& other) { if (other.is_empty()) return; if (m_ != other.m_) { throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_)); } const uint64_t final_n(n_ + other.n_); for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) { update(other.items_[i]); } if (other.num_levels_ >= 2) { merge_higher_levels(other, final_n); } n_ = final_n; if (other.is_estimation_mode()) { min_k_ = std::min(min_k_, other.min_k_); } if (is_empty() or other.min_value_ < min_value_) min_value_ = other.min_value_; if (is_empty() or max_value_ < other.max_value_) max_value_ = other.max_value_; assert_correct_total_weight(); } bool is_empty() const { return n_ == 0; } uint64_t get_n() const { return n_; } uint32_t get_num_retained() const { return levels_[num_levels_] - levels_[0]; } bool is_estimation_mode() const { return num_levels_ > 1; } T get_min_value() const { if (is_empty()) { if (std::is_floating_point::value) { return std::numeric_limits::quiet_NaN(); } throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values"); } return min_value_; } T get_max_value() const { if (is_empty()) { if (std::is_floating_point::value) { return std::numeric_limits::quiet_NaN(); } throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values"); } return max_value_; } T get_quantile(double fraction) const { if (is_empty()) { if (std::is_floating_point::value) { return std::numeric_limits::quiet_NaN(); } throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values"); } if (fraction == 0.0) return min_value_; if (fraction == 1.0) return max_value_; if ((fraction < 0.0) or (fraction > 1.0)) { throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0"); } // has side effect of sorting level zero if needed auto quantile_calculator(const_cast(this)->get_quantile_calculator()); return quantile_calculator->get_quantile(fraction); } std::unique_ptr get_quantiles(const double* fractions, uint32_t size) const { if (is_empty()) { return nullptr; } std::unique_ptr> quantile_calculator; std::unique_ptr quantiles(new T[size]); for (uint32_t i = 0; i < size; i++) { const double fraction = fractions[i]; if ((fraction < 0.0) or (fraction > 1.0)) { throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0"); } if (fraction == 0.0) quantiles[i] = min_value_; else if (fraction == 1.0) quantiles[i] = max_value_; else { if (!quantile_calculator) { // has side effect of sorting level zero if needed quantile_calculator = const_cast(this)->get_quantile_calculator(); } quantiles[i] = quantile_calculator->get_quantile(fraction); } } return std::move(quantiles); } double get_rank(const T& value) const { if (is_empty()) return std::numeric_limits::quiet_NaN(); uint8_t level(0); uint64_t weight(1); uint64_t total(0); while (level < num_levels_) { const auto from_index(levels_[level]); const auto to_index(levels_[level + 1]); // exclusive for (uint32_t i = from_index; i < to_index; i++) { if (items_[i] < value) { total += weight; } else if ((level > 0) or is_level_zero_sorted_) { break; // levels above 0 are sorted, no point comparing further } } level++; weight *= 2; } return (double) total / n_; } std::unique_ptr get_PMF(const T* split_points, uint32_t size) const { return get_PMF_or_CDF(split_points, size, false); } std::unique_ptr get_CDF(const T* split_points, uint32_t size) const { return get_PMF_or_CDF(split_points, size, true); } double get_normalized_rank_error(bool pmf) const { return get_normalized_rank_error(min_k_, pmf); } // this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item // this method is for the user's convenience to predict the sketch size before serialization // and is not used in the serialization and deserialization code // predicting the size before serialization may not make sense if the item type is not of a fixed size (like string) uint32_t get_serialized_size_bytes() const { if (is_empty()) { return EMPTY_SIZE_BYTES; } return get_serialized_size_bytes(num_levels_, get_num_retained(), get_sizeof_item()); } // this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item // this method is for the user's convenience to predict the sketch size before serialization // and is not used in the serialization and deserialization code // predicting the size before serialization may not make sense if the item type is not of a fixed size (like string) static uint32_t get_sizeof_item() { return sizeof(T); } // this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item // this method is for the user's convenience to predict the sketch size before serialization // and is not used in the serialization and deserialization code // predicting the size before serialization may not make sense if the item type is not of a fixed size (like string) static uint32_t get_max_serialized_size_bytes(uint16_t k, uint64_t n) { const uint8_t num_levels = kll_helper::ub_on_num_levels(n); const uint32_t max_capacity = kll_helper::compute_total_capacity(k, DEFAULT_M, num_levels); const uint32_t max_num_items = n < max_capacity ? n : max_capacity; return get_serialized_size_bytes(num_levels, max_num_items, get_sizeof_item()); } void serialize(std::ostream& os) const { const bool is_single_item = n_ == 1; const uint8_t preamble_ints(is_empty() or is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL); os.write((char*)&preamble_ints, sizeof(preamble_ints)); const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1); os.write((char*)&serial_version, sizeof(serial_version)); const uint8_t family(FAMILY); os.write((char*)&family, sizeof(family)); const uint8_t flags_byte( (is_empty() ? 1 << flags::IS_EMPTY : 0) | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0) | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0) ); os.write((char*)&flags_byte, sizeof(flags_byte)); os.write((char*)&k_, sizeof(k_)); os.write((char*)&m_, sizeof(m_)); const uint8_t unused(0); os.write((char*)&unused, sizeof(unused)); if (is_empty()) return; if (!is_single_item) { os.write((char*)&n_, sizeof(n_)); os.write((char*)&min_k_, sizeof(min_k_)); os.write((char*)&num_levels_, sizeof(num_levels_)); os.write((char*)&unused, sizeof(unused)); os.write((char*)levels_, sizeof(levels_[0]) * num_levels_); serialize_items(os, &min_value_, 1); serialize_items(os, &max_value_, 1); } serialize_items(os, &items_[levels_[0]], get_num_retained()); } std::pair serialize(unsigned header_size_bytes = 0) const { const bool is_single_item = n_ == 1; const size_t size = header_size_bytes + get_serialized_size_bytes(); typedef typename A::template rebind::other AllocChar; AllocChar allocator; ptr_with_deleter data_ptr( static_cast(allocator.allocate(size)), [size](void* ptr) { AllocChar allocator; allocator.deallocate(static_cast(ptr), size); } ); char* ptr = static_cast(data_ptr.get()) + header_size_bytes; const uint8_t preamble_ints(is_empty() or is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL); ptr += copy_to_mem(ptr, &preamble_ints, sizeof(preamble_ints)); const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1); ptr += copy_to_mem(ptr, &serial_version, sizeof(serial_version)); const uint8_t family(FAMILY); ptr += copy_to_mem(ptr, &family, sizeof(family)); const uint8_t flags_byte( (is_empty() ? 1 << flags::IS_EMPTY : 0) | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0) | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0) ); ptr += copy_to_mem(ptr, &flags_byte, sizeof(flags_byte)); ptr += copy_to_mem(ptr, &k_, sizeof(k_)); ptr += copy_to_mem(ptr, &m_, sizeof(m_)); const uint8_t unused(0); ptr += copy_to_mem(ptr, &unused, sizeof(unused)); if (!is_empty()) { if (!is_single_item) { ptr += copy_to_mem(ptr, &n_, sizeof(n_)); ptr += copy_to_mem(ptr, &min_k_, sizeof(min_k_)); ptr += copy_to_mem(ptr, &num_levels_, sizeof(num_levels_)); ptr += copy_to_mem(ptr, &unused, sizeof(unused)); ptr += copy_to_mem(ptr, levels_, sizeof(levels_[0]) * num_levels_); ptr += serialize_items(ptr, &min_value_, 1); ptr += serialize_items(ptr, &max_value_, 1); } ptr += serialize_items(ptr, &items_[levels_[0]], get_num_retained()); } if (ptr != static_cast(data_ptr.get()) + size) throw std::logic_error("serialized size mismatch"); return std::make_pair(std::move(data_ptr), size); } static std::unique_ptr, std::function*)>> deserialize(std::istream& is) { uint8_t preamble_ints; is.read((char*)&preamble_ints, sizeof(preamble_ints)); uint8_t serial_version; is.read((char*)&serial_version, sizeof(serial_version)); uint8_t family_id; is.read((char*)&family_id, sizeof(family_id)); uint8_t flags_byte; is.read((char*)&flags_byte, sizeof(flags_byte)); uint16_t k; is.read((char*)&k, sizeof(k)); uint8_t m; is.read((char*)&m, sizeof(m)); uint8_t unused; is.read((char*)&unused, sizeof(unused)); check_m(m); check_preamble_ints(preamble_ints, flags_byte); check_serial_version(serial_version); check_family_id(family_id); typedef typename A::template rebind>::other AA; AA allocator; const bool is_empty(flags_byte & (1 << flags::IS_EMPTY)); std::unique_ptr, std::function*)>> sketch_ptr( is_empty ? new (allocator.allocate(1)) kll_sketch(k) : new (allocator.allocate(1)) kll_sketch(k, flags_byte, is), [](kll_sketch* s) { AA a; a.deallocate(s, 1); } ); return std::move(sketch_ptr); } static std::unique_ptr, std::function*)>> deserialize(const void* bytes, size_t size) { const char* ptr = static_cast(bytes); uint8_t preamble_ints; ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints)); uint8_t serial_version; ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version)); uint8_t family_id; ptr += copy_from_mem(ptr, &family_id, sizeof(family_id)); uint8_t flags_byte; ptr += copy_from_mem(ptr, &flags_byte, sizeof(flags_byte)); uint16_t k; ptr += copy_from_mem(ptr, &k, sizeof(k)); uint8_t m; ptr += copy_from_mem(ptr, &m, sizeof(m)); check_m(m); check_preamble_ints(preamble_ints, flags_byte); check_serial_version(serial_version); check_family_id(family_id); typedef typename A::template rebind>::other AA; AA allocator; const bool is_empty(flags_byte & (1 << flags::IS_EMPTY)); std::unique_ptr, void(*)(kll_sketch*)> sketch_ptr( is_empty ? new (allocator.allocate(1)) kll_sketch(k) : new (allocator.allocate(1)) kll_sketch(k, flags_byte, bytes, size), [](kll_sketch* s) { AA allocator; allocator.deallocate(s, 1); } ); return std::move(sketch_ptr); } /* * Gets the normalized rank error given k and pmf. * k - the configuration parameter * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function. * Otherwise, it is the "single-sided" normalized rank error for all the other queries. * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials */ static double get_normalized_rank_error(uint16_t k, bool pmf) { return pmf ? 2.446 / pow(k, 0.9433) : 2.296 / pow(k, 0.9723); } template friend std::ostream& operator<<(std::ostream& os, kll_sketch const& sketch); #ifdef KLL_VALIDATION uint8_t get_num_levels() { return num_levels_; } uint32_t* get_levels() { return levels_; } T* get_items() { return items_; } #endif private: /* Serialized sketch layout: * Adr: * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | * 0 || unused | M |--------K--------| Flags | FamID | SerVer | PreambleInts | * || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 | * 1 ||-----------------------------------N------------------------------------------| * || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 | * 2 ||---------------data----------------|-unused-|numLevels|-------min K-----------| */ static const size_t EMPTY_SIZE_BYTES = 8; static const size_t DATA_START_SINGLE_ITEM = 8; static const size_t DATA_START = 20; static const uint8_t SERIAL_VERSION_1 = 1; static const uint8_t SERIAL_VERSION_2 = 2; static const uint8_t FAMILY = 15; enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM }; static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item static const uint8_t PREAMBLE_INTS_FULL = 5; uint16_t k_; uint8_t m_; // minimum buffer "width" uint16_t min_k_; // for error estimation after merging with different k uint64_t n_; uint8_t num_levels_; uint32_t* levels_; uint8_t levels_size_; T* items_; uint32_t items_size_; T min_value_; T max_value_; bool is_level_zero_sorted_; AllocT alloc_t; AllocU32 alloc_u32; // for deserialization // the common part of the preamble was read and compatibility checks were done kll_sketch(uint16_t k, uint8_t flags_byte, std::istream& is) : alloc_t(AllocT()), alloc_u32(AllocU32()) { k_ = k; m_ = DEFAULT_M; const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2 if (is_single_item) { n_ = 1; min_k_ = k_; num_levels_ = 1; } else { is.read((char*)&n_, sizeof(n_)); is.read((char*)&min_k_, sizeof(min_k_)); is.read((char*)&num_levels_, sizeof(num_levels_)); uint8_t unused; is.read((char*)&unused, sizeof(unused)); } levels_ = alloc_u32.allocate(num_levels_ + 1); levels_size_ = num_levels_ + 1; const uint32_t capacity(kll_helper::compute_total_capacity(k_, m_, num_levels_)); if (is_single_item) { levels_[0] = capacity - 1; } else { // the last integer in levels_ is not serialized because it can be derived is.read((char*)levels_, sizeof(levels_[0]) * num_levels_); } levels_[num_levels_] = capacity; if (!is_single_item) { deserialize_items(is, &min_value_, 1); deserialize_items(is, &max_value_, 1); } items_ = alloc_t.allocate(capacity); items_size_ = capacity; for (unsigned i = 0; i < items_size_; i++) alloc_t.construct(&items_[i], T()); const auto num_items(levels_[num_levels_] - levels_[0]); deserialize_items(is, &items_[levels_[0]], num_items); if (is_single_item) { min_value_ = items_[levels_[0]]; max_value_ = items_[levels_[0]]; } is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0; } // for deserialization // the common part of the preamble was read and compatibility checks were done kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size) : alloc_t(AllocT()), alloc_u32(AllocU32()) { k_ = k; m_ = DEFAULT_M; const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2 const char* ptr = static_cast(bytes) + DATA_START_SINGLE_ITEM; if (is_single_item) { n_ = 1; min_k_ = k_; num_levels_ = 1; } else { ptr += copy_from_mem(ptr, &n_, sizeof(n_)); ptr += copy_from_mem(ptr, &min_k_, sizeof(min_k_)); ptr += copy_from_mem(ptr, &num_levels_, sizeof(num_levels_)); ptr++; // skip unused byte } levels_ = alloc_u32.allocate(num_levels_ + 1); levels_size_ = num_levels_ + 1; const uint32_t capacity(kll_helper::compute_total_capacity(k_, m_, num_levels_)); if (is_single_item) { levels_[0] = capacity - 1; } else { // the last integer in levels_ is not serialized because it can be derived ptr += copy_from_mem(ptr, levels_, sizeof(levels_[0]) * num_levels_); } levels_[num_levels_] = capacity; if (!is_single_item) { ptr += deserialize_items(ptr, &min_value_, 1); ptr += deserialize_items(ptr, &max_value_, 1); } items_ = alloc_t.allocate(capacity); items_size_ = capacity; for (unsigned i = 0; i < items_size_; i++) alloc_t.construct(&items_[i], T()); const auto num_items(levels_[num_levels_] - levels_[0]); ptr += deserialize_items(ptr, &items_[levels_[0]], num_items); if (is_single_item) { min_value_ = items_[levels_[0]]; max_value_ = items_[levels_[0]]; } is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0; if (ptr != static_cast(bytes) + size) throw std::logic_error("deserialized size mismatch"); } // The following code is only valid in the special case of exactly reaching capacity while updating. // It cannot be used while merging, while reducing k, or anything else. void compress_while_updating(void) { const uint8_t level(find_level_to_compact()); // It is important to add the new top level right here. Be aware that this operation // grows the buffer and shifts the data and also the boundaries of the data and grows the // levels array and increments numLevels_ if (level == (num_levels_ - 1)) { add_empty_top_level_to_completely_full_sketch(); } const uint32_t raw_beg(levels_[level]); const uint32_t raw_lim(levels_[level + 1]); // +2 is OK because we already added a new top level if necessary const uint32_t pop_above(levels_[level + 2] - raw_lim); const uint32_t raw_pop(raw_lim - raw_beg); const bool odd_pop(kll_helper::is_odd(raw_pop)); const uint32_t adj_beg(odd_pop ? raw_beg + 1 : raw_beg); const uint32_t adj_pop(odd_pop ? raw_pop - 1 : raw_pop); const uint32_t half_adj_pop(adj_pop / 2); // level zero might not be sorted, so we must sort it if we wish to compact it if (level == 0) { std::sort(&items_[adj_beg], &items_[adj_beg + adj_pop]); } if (pop_above == 0) { kll_helper::randomly_halve_up(items_, adj_beg, adj_pop); } else { kll_helper::randomly_halve_down(items_, adj_beg, adj_pop); kll_helper::merge_sorted_arrays(items_, adj_beg, half_adj_pop, items_, raw_lim, pop_above, items_, adj_beg + half_adj_pop); } levels_[level + 1] -= half_adj_pop; // adjust boundaries of the level above if (odd_pop) { levels_[level] = levels_[level + 1] - 1; // the current level now contains one item items_[levels_[level]] = items_[raw_beg]; // namely this leftover guy } else { levels_[level] = levels_[level + 1]; // the current level is now empty } // verify that we freed up halfAdjPop array slots just below the current level assert (levels_[level] == (raw_beg + half_adj_pop)); // finally, we need to shift up the data in the levels below // so that the freed-up space can be used by level zero if (level > 0) { const uint32_t amount(raw_beg - levels_[0]); std::move_backward(&items_[levels_[0]], &items_[levels_[0] + amount], &items_[levels_[0] + half_adj_pop + amount]); for (uint8_t lvl = 0; lvl < level; lvl++) { levels_[lvl] += half_adj_pop; } } } uint8_t find_level_to_compact() const { uint8_t level(0); while (true) { assert (level < num_levels_); const uint32_t pop(levels_[level + 1] - levels_[level]); const uint32_t cap(kll_helper::level_capacity(k_, num_levels_, level, m_)); if (pop >= cap) { return level; } level++; } } void add_empty_top_level_to_completely_full_sketch() { const uint32_t cur_total_cap(levels_[num_levels_]); // make sure that we are following a certain growth scheme assert (levels_[0] == 0); assert (items_size_ == cur_total_cap); // note that merging MIGHT over-grow levels_, in which case we might not have to grow it here if (levels_size_ < (num_levels_ + 2)) { uint32_t* new_levels(alloc_u32.allocate(num_levels_ + 2)); std::copy(&levels_[0], &levels_[levels_size_], new_levels); alloc_u32.deallocate(levels_, levels_size_); levels_ = new_levels; levels_size_ = num_levels_ + 2; } const uint32_t delta_cap(kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_)); const uint32_t new_total_cap(cur_total_cap + delta_cap); T* new_buf(alloc_t.allocate(new_total_cap)); for (unsigned i = 0; i < new_total_cap; i++) alloc_t.construct(&new_buf[i], T()); // move (and shift) the current data into the new buffer std::move(&items_[levels_[0]], &items_[levels_[0] + cur_total_cap], &new_buf[levels_[0] + delta_cap]); for (unsigned i = 0; i < items_size_; i++) alloc_t.destroy(&items_[i]); alloc_t.deallocate(items_, items_size_); items_ = new_buf; items_size_ = new_total_cap; // this loop includes the old "extra" index at the top for (uint8_t i = 0; i <= num_levels_; i++) { levels_[i] += delta_cap; } assert (levels_[num_levels_] == new_total_cap); num_levels_++; levels_[num_levels_] = new_total_cap; // initialize the new "extra" index at the top } void sort_level_zero() { if (!is_level_zero_sorted_) { std::sort(&items_[levels_[0]], &items_[levels_[1]]); is_level_zero_sorted_ = true; } } std::unique_ptr> get_quantile_calculator() { sort_level_zero(); std::unique_ptr> quantile_calculator(new kll_quantile_calculator(items_, levels_, num_levels_, n_)); return std::move(quantile_calculator); } std::unique_ptr get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const { if (is_empty()) return nullptr; kll_helper::validate_values(split_points, size); std::unique_ptr buckets(new double[size + 1]); std::fill(&buckets.get()[0], &buckets.get()[size + 1], 0); uint8_t level(0); uint64_t weight(1); while (level < num_levels_) { const auto from_index = levels_[level]; const auto to_index = levels_[level + 1]; // exclusive if ((level == 0) and !is_level_zero_sorted_) { increment_buckets_unsorted_level(from_index, to_index, weight, split_points, size, buckets.get()); } else { increment_buckets_sorted_level(from_index, to_index, weight, split_points, size, buckets.get()); } level++; weight *= 2; } // normalize and, if CDF, convert to cumulative if (is_CDF) { double subtotal = 0; for (uint32_t i = 0; i <= size; i++) { subtotal += buckets[i]; buckets[i] = subtotal / n_; } } else { for (uint32_t i = 0; i <= size; i++) { buckets[i] /= n_; } } return std::move(buckets); } void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight, const T* split_points, uint32_t size, double* buckets) const { for (uint32_t i = from_index; i < to_index; i++) { uint32_t j; for (j = 0; j < size; j++) { if (items_[i] < split_points[j]) { break; } } buckets[j] += weight; } } void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight, const T* split_points, uint32_t size, double* buckets) const { uint32_t i = from_index; uint32_t j = 0; while ((i < to_index) and (j < size)) { if (items_[i] < split_points[j]) { buckets[j] += weight; // this sample goes into this bucket i++; // move on to next sample and see whether it also goes into this bucket } else { j++; // no more samples for this bucket } } // now either i == to_index (we are out of samples), or // j == size (we are out of buckets, but there are more samples remaining) // we only need to do something in the latter case if (j == size) { buckets[j] += weight * (to_index - i); } } void merge_higher_levels(const kll_sketch& other, uint64_t final_n) { const uint32_t tmp_space_needed(get_num_retained() + other.get_num_retained_above_level_zero()); const std::unique_ptr workbuf(new T[tmp_space_needed]); const uint8_t ub = kll_helper::ub_on_num_levels(final_n); const std::unique_ptr worklevels(new uint32_t[ub + 2]); // ub+1 does not work const std::unique_ptr outlevels(new uint32_t[ub + 2]); const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_); populate_work_arrays(other, workbuf.get(), worklevels.get(), provisional_num_levels); // notice that workbuf is being used as both the input and output here const kll_helper::compress_result result = kll_helper::general_compress(k_, m_, provisional_num_levels, workbuf.get(), worklevels.get(), workbuf.get(), outlevels.get(), is_level_zero_sorted_); const uint8_t final_num_levels = result.final_num_levels; const uint32_t final_capacity = result.final_capacity; const uint32_t final_pop = result.final_pop; assert (final_num_levels <= ub); // can sometimes be much bigger // now we need to transfer the results back into the "self" sketch if (final_capacity != items_size_) { for (unsigned i = 0; i < items_size_; i++) alloc_t.destroy(&items_[i]); alloc_t.deallocate(items_, items_size_); items_ = alloc_t.allocate(final_capacity); items_size_ = final_capacity; for (unsigned i = 0; i < items_size_; i++) alloc_t.construct(&items_[i], T()); } const uint32_t free_space_at_bottom = final_capacity - final_pop; std::move(&workbuf[outlevels[0]], &workbuf[outlevels[0] + final_pop], &items_[free_space_at_bottom]); const uint32_t the_shift(free_space_at_bottom - outlevels[0]); if (levels_size_ < (final_num_levels + 1)) { alloc_u32.deallocate(levels_, levels_size_); levels_ = alloc_u32.allocate(final_num_levels + 1); levels_size_ = final_num_levels + 1; } for (uint8_t lvl = 0; lvl < (final_num_levels + 1); lvl++) { // includes the "extra" index levels_[lvl] = outlevels[lvl] + the_shift; } num_levels_ = final_num_levels; } void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) { worklevels[0] = 0; // Note: the level zero data from "other" was already inserted into "self" const uint32_t self_pop_zero(safe_level_size(0)); std::move(&items_[levels_[0]], &items_[levels_[0] + self_pop_zero], &workbuf[worklevels[0]]); worklevels[1] = worklevels[0] + self_pop_zero; for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) { const uint32_t self_pop = safe_level_size(lvl); const uint32_t other_pop = other.safe_level_size(lvl); worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop; if ((self_pop > 0) and (other_pop == 0)) { std::move(&items_[levels_[lvl]], &items_[levels_[lvl] + self_pop], &workbuf[worklevels[lvl]]); } else if ((self_pop == 0) and (other_pop > 0)) { std::copy(&other.items_[other.levels_[lvl]], &other.items_[other.levels_[lvl] + other_pop], &workbuf[worklevels[lvl]]); } else if ((self_pop > 0) and (other_pop > 0)) { kll_helper::merge_sorted_arrays(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]); } } } void assert_correct_total_weight() const { const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_)); assert (total == n_); } uint32_t safe_level_size(uint8_t level) const { if (level >= num_levels_) return 0; return levels_[level + 1] - levels_[level]; } uint32_t get_num_retained_above_level_zero() const { if (num_levels_ == 1) return 0; return levels_[num_levels_] - levels_[1]; } static uint32_t get_serialized_size_bytes(uint8_t num_levels, uint32_t num_items, uint32_t sizeof_item) { if (num_levels == 1 and num_items == 1) { return DATA_START_SINGLE_ITEM + sizeof_item; } // the last integer in the levels_ array is not serialized because it can be derived // +2 items for min and max return DATA_START + (num_levels * sizeof(uint32_t)) + ((num_items + 2) * sizeof_item); } static void check_m(uint8_t m) { if (m != DEFAULT_M) { throw std::invalid_argument("Possible corruption: M must be " + std::to_string(DEFAULT_M) + ": " + std::to_string(m)); } } static void check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) { const bool is_empty(flags_byte & (1 << flags::IS_EMPTY)); const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); if (is_empty or is_single_item) { if (preamble_ints != PREAMBLE_INTS_SHORT) { throw std::invalid_argument("Possible corruption: preamble ints must be " + std::to_string(PREAMBLE_INTS_SHORT) + " for an empty or single item sketch: " + std::to_string(preamble_ints)); } } else { if (preamble_ints != PREAMBLE_INTS_FULL) { throw std::invalid_argument("Possible corruption: preamble ints must be " + std::to_string(PREAMBLE_INTS_FULL) + " for a sketch with more than one item: " + std::to_string(preamble_ints)); } } } static void check_serial_version(uint8_t serial_version) { if (serial_version != SERIAL_VERSION_1 and serial_version != SERIAL_VERSION_2) { throw std::invalid_argument("Possible corruption: serial version mismatch: expected " + std::to_string(SERIAL_VERSION_1) + " or " + std::to_string(SERIAL_VERSION_2) + ", got " + std::to_string(serial_version)); } } static void check_family_id(uint8_t family_id) { if (family_id != FAMILY) { throw std::invalid_argument("Possible corruption: family mismatch: expected " + std::to_string(FAMILY) + ", got " + std::to_string(family_id)); } } static inline size_t copy_from_mem(const void* src, void* dst, size_t size) { memcpy(dst, src, size); return size; } static inline size_t copy_to_mem(void* dst, const void* src, size_t size) { memcpy(dst, src, size); return size; } }; template std::ostream& operator<<(std::ostream& os, kll_sketch const& sketch) { os << "### KLL sketch summary:" << std::endl; os << " K : " << sketch.k_ << std::endl; os << " min K : " << sketch.min_k_ << std::endl; os << " M : " << (unsigned int) sketch.m_ << std::endl; os << " N : " << sketch.n_ << std::endl; os << " Epsilon : " << std::setprecision(3) << sketch.get_normalized_rank_error(false) * 100 << "%" << std::endl; os << " Epsilon PMF : " << sketch.get_normalized_rank_error(true) * 100 << "%" << std::endl; os << " Empty : " << (sketch.is_empty() ? "true" : "false") << std::endl; os << " Estimation mode: " << (sketch.is_estimation_mode() ? "true" : "false") << std::endl; os << " Levels : " << (unsigned int) sketch.num_levels_ << std::endl; os << " Sorted : " << (sketch.is_level_zero_sorted_ ? "true" : "false") << std::endl; os << " Capacity items : " << sketch.items_size_ << std::endl; os << " Retained items : " << sketch.get_num_retained() << std::endl; os << " Storage bytes : " << sketch.get_serialized_size_bytes() << std::endl; using std::to_string; // to allow overriding to_string() for a custom type if (!sketch.is_empty()) { os << " Min value : " << to_string(sketch.get_min_value()) << std::endl; os << " Max value : " << to_string(sketch.get_max_value()) << std::endl; } os << "### End sketch summary" << std::endl; // for debugging const bool with_levels(false); const bool with_data(false); if (with_levels) { os << "### KLL sketch levels:" << std::endl; os << " index: nominal capacity, actual size" << std::endl; for (uint8_t i = 0; i < sketch.num_levels_; i++) { os << " " << (unsigned int) i << ": " << kll_helper::level_capacity(sketch.k_, sketch.num_levels_, i, sketch.m_) << ", " << sketch.safe_level_size(i) << std::endl; } os << "### End sketch levels" << std::endl; } if (with_data) { os << "### KLL sketch data:" << std::endl; uint8_t level(0); while (level < sketch.num_levels_) { const uint32_t from_index = sketch.levels_[level]; const uint32_t to_index = sketch.levels_[level + 1]; // exclusive if (from_index < to_index) { os << " level " << (unsigned int) level << ":" << std::endl; } for (uint32_t i = from_index; i < to_index; i++) { os << " " << to_string(sketch.items_[i]) << std::endl; } level++; } os << "### End sketch data" << std::endl; } return os; } } /* namespace datasketches */ #endif