/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef _VAR_OPT_SKETCH_HPP_ #define _VAR_OPT_SKETCH_HPP_ #include "serde.hpp" #include "common_defs.hpp" #include #include namespace datasketches { template using AllocU8 = typename std::allocator_traits::template rebind_alloc; template using vector_u8 = std::vector>; /* * A struct to hold the result of subset sum queries */ struct subset_summary { double lower_bound; double estimate; double upper_bound; double total_sketch_weight; }; template class var_opt_union; // forward declaration /// VarOpt sketch constants namespace var_opt_constants { /// default resize factor const resize_factor DEFAULT_RESIZE_FACTOR = resize_factor::X8; /// maximum value of parameter K const uint32_t MAX_K = ((uint32_t) 1 << 31) - 2; } /** * This sketch samples data from a stream of items. Designed for optimal (minimum) variance when * querying the sketch to estimate subset sums of items matching a provided predicate. Variance * optimal (varopt) sampling is related to reservoir sampling, with improved error bounds for * subset sum estimation. * * author Kevin Lang * author Jon Malkin */ template< typename T, typename A = std::allocator > class var_opt_sketch { public: static const resize_factor DEFAULT_RESIZE_FACTOR = var_opt_constants::DEFAULT_RESIZE_FACTOR; static const uint32_t MAX_K = var_opt_constants::MAX_K; /** * Constructor * @param k sketch size * @param rf resize factor * @param allocator instance of an allocator */ explicit var_opt_sketch(uint32_t k, resize_factor rf = var_opt_constants::DEFAULT_RESIZE_FACTOR, const A& allocator = A()); /** * Copy constructor * @param other sketch to be copied */ var_opt_sketch(const var_opt_sketch& other); /** * Move constructor * @param other sketch to be moved */ var_opt_sketch(var_opt_sketch&& other) noexcept; ~var_opt_sketch(); /** * Copy assignment * @param other sketch to be copied * @return reference to this sketch */ var_opt_sketch& operator=(const var_opt_sketch& other); /** * Move assignment * @param other sketch to be moved * @return reference to this sketch */ var_opt_sketch& operator=(var_opt_sketch&& other); /** * Updates this sketch with the given data item with the given weight. * This method takes an lvalue. * @param item an item from a stream of items * @param weight the weight of the item */ void update(const T& item, double weight = 1.0); /** * Updates this sketch with the given data item with the given weight. * This method takes an rvalue. * @param item an item from a stream of items * @param weight the weight of the item */ void update(T&& item, double weight = 1.0); /** * Returns the configured maximum sample size. * @return configured maximum sample size */ inline uint32_t get_k() const; /** * Returns the length of the input stream. * @return stream length */ inline uint64_t get_n() const; /** * Returns the number of samples currently in the sketch * @return stream length */ inline uint32_t get_num_samples() const; /** * Computes an estimated subset sum from the entire stream for objects matching a given * predicate. Provides a lower bound, estimate, and upper bound using a target of 2 standard * deviations. This is technically a heuristic method and tries to err on the conservative side. * @param predicate a predicate function * @return a subset_summary item with estimate, upper and lower bounds, * and total sketch weight */ template subset_summary estimate_subset_sum(P predicate) const; /** * Returns true if the sketch is empty. * @return empty flag */ inline bool is_empty() const; /** * Resets the sketch to its default, empty state. */ void reset(); /** * Computes size needed to serialize the current state of the sketch. * This version is for fixed-size arithmetic types (integral and floating point). * @param sd instance of a SerDe * @return size in bytes needed to serialize this sketch */ template, typename std::enable_if::value, int>::type = 0> inline size_t get_serialized_size_bytes(const SerDe& sd = SerDe()) const; /** * Computes size needed to serialize the current state of the sketch. * This version is for all other types and can be expensive since every item needs to be looked at. * @param sd instance of a SerDe * @return size in bytes needed to serialize this sketch */ template, typename std::enable_if::value, int>::type = 0> inline size_t get_serialized_size_bytes(const SerDe& sd = SerDe()) const; // This is a convenience alias for users // The type returned by the following serialize method using vector_bytes = vector_u8; /** * This method serializes the sketch as a vector of bytes. * An optional header can be reserved in front of the sketch. * It is a blank space of a given size. * This header is used in Datasketches PostgreSQL extension. * @param header_size_bytes space to reserve in front of the sketch * @param sd instance of a SerDe */ template> vector_bytes serialize(unsigned header_size_bytes = 0, const SerDe& sd = SerDe()) const; /** * This method serializes the sketch into a given stream in a binary form * @param os output stream * @param sd instance of a SerDe */ template> void serialize(std::ostream& os, const SerDe& sd = SerDe()) const; /** * This method deserializes a sketch from a given stream. * @param is input stream * @param sd instance of a SerDe * @param allocator instance of an allocator * @return an instance of a sketch */ template> static var_opt_sketch deserialize(std::istream& is, const SerDe& sd = SerDe(), const A& allocator = A()); /** * This method deserializes a sketch from a given array of bytes. * @param bytes pointer to the array of bytes * @param size the size of the array * @param sd instance of a SerDe * @param allocator instance of an allocator * @return an instance of a sketch */ template> static var_opt_sketch deserialize(const void* bytes, size_t size, const SerDe& sd = SerDe(), const A& allocator = A()); /** * Prints a summary of the sketch. * @return the summary as a string */ string to_string() const; /** * Prints the raw sketch items to a string. Calls items_to_stream() internally. * Only works for type T with a defined * std::ostream& operator<<(std::ostream&, const T&) and * kept separate from to_string() to allow compilation even if * T does not have such an operator defined. * @return a string with the sketch items */ string items_to_string() const; class const_iterator; /** * Iterator pointing to the first item in the sketch. * If the sketch is empty, the returned iterator must not be dereferenced or incremented. * @return iterator pointing to the first item in the sketch */ const_iterator begin() const; /** * Iterator pointing to the past-the-end item in the sketch. * The past-the-end item is the hypothetical item that would follow the last item. * It does not point to any item, and must not be dereferenced or incremented. * @return iterator pointing to the past-the-end item in the sketch */ const_iterator end() const; private: typedef typename std::allocator_traits::template rebind_alloc AllocDouble; typedef typename std::allocator_traits::template rebind_alloc AllocBool; static const uint32_t MIN_LG_ARR_ITEMS = 3; static const uint8_t PREAMBLE_LONGS_EMPTY = 1; static const uint8_t PREAMBLE_LONGS_WARMUP = 3; static const uint8_t PREAMBLE_LONGS_FULL = 4; static const uint8_t SER_VER = 2; static const uint8_t FAMILY_ID = 13; static const uint8_t EMPTY_FLAG_MASK = 4; static const uint8_t GADGET_FLAG_MASK = 128; // Number of standard deviations to use for subset sum error bounds constexpr static const double DEFAULT_KAPPA = 2.0; // TODO: should probably rearrange a bit to minimize gaps once aligned uint32_t k_; // max size of sketch, in items uint32_t h_; // number of items in heap uint32_t m_; // number of items in middle region uint32_t r_; // number of items in reservoir-like region uint64_t n_; // total number of items processed by sketch double total_wt_r_; // total weight of items in reservoir-like area resize_factor rf_; // resize factor uint32_t curr_items_alloc_; // currently allocated array size bool filled_data_; // true if we've explicitly set all entries in data_ A allocator_; T* data_; // stored sampled items double* weights_; // weights for sampled items // The next two fields are hidden from the user because they are part of the state of the // unioning algorithm, NOT part of a varopt sketch, or even of a varopt "gadget" (our name for // the potentially invalid sketch that is maintained by the unioning algorithm). It would make // more sense logically for these fields to be declared in the unioning object (whose entire // purpose is storing the state of the unioning algorithm) but for reasons of programming // convenience we are currently declaring them here. However, that could change in the future. // Following int is: // 1. Zero (for a varopt sketch) // 2. Count of marked items in H region, if part of a unioning algo's gadget uint32_t num_marks_in_h_; // The following array is absent in a varopt sketch, and notionally present in a gadget // (although it really belongs in the unioning object). If the array were to be made explicit, // some additional coding would need to be done to ensure that all of the necessary data motion // occurs and is properly tracked. bool* marks_; // used during deserialization to avoid memory leaks upon errors class items_deleter; class weights_deleter; class marks_deleter; var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, const A& allocator); var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf, uint32_t curr_items_alloc, bool filled_data, std::unique_ptr items, std::unique_ptr weights, uint32_t num_marks_in_h, std::unique_ptr marks, const A& allocator); friend class var_opt_union; var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n); string items_to_string(bool print_gap) const; // internal-use-only update template inline void update(O&& item, double weight, bool mark); template inline void update_warmup_phase(O&& item, double weight, bool mark); template inline void update_light(O&& item, double weight, bool mark); template inline void update_heavy_r_eq1(O&& item, double weight, bool mark); template inline void update_heavy_general(O&& item, double weight, bool mark); inline double get_tau() const; inline double peek_min() const; inline bool is_marked(uint32_t idx) const; inline uint32_t pick_random_slot_in_r() const; inline uint32_t choose_delete_slot(double wt_cand, uint32_t num_cand) const; inline uint32_t choose_weighted_delete_slot(double wt_cand, uint32_t num_cand) const; template inline void push(O&& item, double wt, bool mark); inline void transition_from_warmup(); inline void convert_to_heap(); inline void restore_towards_leaves(uint32_t slot_in); inline void restore_towards_root(uint32_t slot_in); inline void pop_min_to_m_region(); void grow_candidate_set(double wt_cands, uint32_t num_cands); void decrease_k_by_1(); void strip_marks(); void force_set_k(uint32_t k); // used to resolve union gadget into sketch void downsample_candidate_set(double wt_cands, uint32_t num_cands); inline void swap_values(uint32_t src, uint32_t dst); void grow_data_arrays(); void allocate_data_arrays(uint32_t tgt_size, bool use_marks); // validation static void check_preamble_longs(uint8_t preamble_longs, uint8_t flags); static void check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver); static uint32_t validate_and_get_target_size(uint32_t preamble_longs, uint32_t k, uint64_t n, uint32_t h, uint32_t r, resize_factor rf); // things to move to common and be shared among sketches static uint32_t get_adjusted_size(uint32_t max_size, uint32_t resize_target); static uint32_t starting_sub_multiple(uint32_t lg_target, uint32_t lg_rf, uint32_t lg_min); static inline double pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate); static inline double pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate); static bool is_power_of_2(uint32_t v); static uint32_t to_log_2(uint32_t v); static inline uint32_t next_int(uint32_t max_value); static inline double next_double_exclude_zero(); class iterator; }; template class var_opt_sketch::const_iterator { public: using iterator_category = std::input_iterator_tag; using value_type = std::pair; using difference_type = void; using pointer = const return_value_holder; using reference = const value_type; const_iterator(const const_iterator& other); const_iterator& operator++(); const_iterator& operator++(int); bool operator==(const const_iterator& other) const; bool operator!=(const const_iterator& other) const; reference operator*() const; pointer operator->() const; private: friend class var_opt_sketch; friend class var_opt_union; // default iterator over full sketch const_iterator(const var_opt_sketch& sk, bool is_end); // iterates over only one of the H or R regions // does not apply weight correction const_iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region); bool get_mark() const; const var_opt_sketch* sk_; double cum_r_weight_; // used for weight correction double r_item_wt_; size_t idx_; const size_t final_idx_; }; // non-const iterator for internal use template class var_opt_sketch::iterator { public: using iterator_category = std::input_iterator_tag; using value_type = std::pair; using difference_type = void; using pointer = return_value_holder; using reference = value_type; iterator(const iterator& other); iterator& operator++(); iterator& operator++(int); bool operator==(const iterator& other) const; bool operator!=(const iterator& other) const; reference operator*(); pointer operator->(); private: friend class var_opt_sketch; friend class var_opt_union; // iterates over only one of the H or R region, applying weight correction // if iterating over R region (can correct for numerical precision issues) iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region); bool get_mark() const; const var_opt_sketch* sk_; double cum_r_weight_; // used for weight correction double r_item_wt_; size_t idx_; const size_t final_idx_; }; } // namespace datasketches #include "var_opt_sketch_impl.hpp" #endif // _VAR_OPT_SKETCH_HPP_