/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef FREQUENT_ITEMS_SKETCH_HPP_ #define FREQUENT_ITEMS_SKETCH_HPP_ #include #include #include #include #include #include #include "reverse_purge_hash_map.hpp" #include "serde.hpp" namespace datasketches { /* * Based on Java implementation here: * https://github.com/DataSketches/sketches-core/blob/master/src/main/java/com/yahoo/sketches/frequencies/ItemsSketch.java * author Alexander Saydakov */ enum frequent_items_error_type { NO_FALSE_POSITIVES, NO_FALSE_NEGATIVES }; // for serialization as raw bytes typedef std::unique_ptr> void_ptr_with_deleter; template, typename E = std::equal_to, typename S = serde, typename A = std::allocator> class frequent_items_sketch { public: static const uint64_t USE_MAX_ERROR = 0; // used in get_frequent_items explicit frequent_items_sketch(uint8_t lg_max_map_size); frequent_items_sketch(uint8_t lg_start_map_size, uint8_t lg_max_map_size); class row; void update(const T& item, uint64_t weight = 1); void update(T&& item, uint64_t weight = 1); void merge(const frequent_items_sketch& other); bool is_empty() const; uint32_t get_num_active_items() const; uint64_t get_total_weight() const; uint64_t get_estimate(const T& item) const; uint64_t get_lower_bound(const T& item) const; uint64_t get_upper_bound(const T& item) const; uint64_t get_maximum_error() const; double get_epsilon() const; static double get_epsilon(uint8_t lg_max_map_size); static double get_apriori_error(uint8_t lg_max_map_size, uint64_t estimated_total_weight); typedef typename std::allocator_traits::template rebind_alloc AllocRow; std::vector get_frequent_items(frequent_items_error_type err_type, uint64_t threshold = USE_MAX_ERROR) const; size_t get_serialized_size_bytes() const; void serialize(std::ostream& os) const; std::pair serialize(unsigned header_size_bytes = 0) const; static frequent_items_sketch deserialize(std::istream& is); static frequent_items_sketch deserialize(const void* bytes, size_t size); void to_stream(std::ostream& os, bool print_items = false) const; private: static const uint8_t LG_MIN_MAP_SIZE = 3; static const uint8_t SERIAL_VERSION = 1; static const uint8_t FAMILY_ID = 10; static const uint8_t PREAMBLE_LONGS_EMPTY = 1; static const uint8_t PREAMBLE_LONGS_NONEMPTY = 4; static constexpr double EPSILON_FACTOR = 3.5; enum flags { IS_EMPTY }; uint64_t total_weight; uint64_t offset; reverse_purge_hash_map map; static void check_preamble_longs(uint8_t preamble_longs, bool is_empty); static void check_serial_version(uint8_t serial_version); static void check_family_id(uint8_t family_id); static void check_size(uint8_t lg_cur_size, uint8_t lg_max_size); }; // clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug" template const uint8_t frequent_items_sketch::LG_MIN_MAP_SIZE; template frequent_items_sketch::frequent_items_sketch(uint8_t lg_max_map_size): total_weight(0), offset(0), map(frequent_items_sketch::LG_MIN_MAP_SIZE, std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE)) { } template frequent_items_sketch::frequent_items_sketch(uint8_t lg_start_map_size, uint8_t lg_max_map_size): total_weight(0), offset(0), map(std::max(lg_start_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE), std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE)) { } template void frequent_items_sketch::update(const T& item, uint64_t weight) { if (weight == 0) return; total_weight += weight; offset += map.adjust_or_insert(item, weight); } template void frequent_items_sketch::update(T&& item, uint64_t weight) { if (weight == 0) return; total_weight += weight; offset += map.adjust_or_insert(std::move(item), weight); } template void frequent_items_sketch::merge(const frequent_items_sketch& other) { if (other.is_empty()) return; const uint64_t merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end for (auto &it: other.map) { update(it.first, it.second); } offset += other.offset; total_weight = merged_total_weight; } template bool frequent_items_sketch::is_empty() const { return map.get_num_active() == 0; } template uint32_t frequent_items_sketch::get_num_active_items() const { return map.get_num_active(); } template uint64_t frequent_items_sketch::get_total_weight() const { return total_weight; } template uint64_t frequent_items_sketch::get_estimate(const T& item) const { // if item is tracked estimate = weight + offset, otherwise 0 const uint64_t weight = map.get(item); if (weight > 0) return weight + offset; return 0; } template uint64_t frequent_items_sketch::get_lower_bound(const T& item) const { return map.get(item); } template uint64_t frequent_items_sketch::get_upper_bound(const T& item) const { return map.get(item) + offset; } template uint64_t frequent_items_sketch::get_maximum_error() const { return offset; } template double frequent_items_sketch::get_epsilon() const { return EPSILON_FACTOR / (1 << map.get_lg_max_size()); } template double frequent_items_sketch::get_epsilon(uint8_t lg_max_map_size) { return EPSILON_FACTOR / (1 << lg_max_map_size); } template double frequent_items_sketch::get_apriori_error(uint8_t lg_max_map_size, uint64_t estimated_total_weight) { return get_epsilon(lg_max_map_size) * estimated_total_weight; } template class frequent_items_sketch::row { public: row(const T* item, uint64_t weight, uint64_t offset): item(item), weight(weight), offset(offset) {} const T& get_item() const { return *item; } uint64_t get_estimate() const { return weight + offset; } uint64_t get_lower_bound() const { return weight; } uint64_t get_upper_bound() const { return weight + offset; } private: const T* item; uint64_t weight; uint64_t offset; }; template std::vector::row, typename frequent_items_sketch::AllocRow> frequent_items_sketch::get_frequent_items(frequent_items_error_type err_type, uint64_t threshold) const { if (threshold == USE_MAX_ERROR) { threshold = get_maximum_error(); } std::vector items; for (auto &it: map) { const uint64_t lb = it.second; const uint64_t ub = it.second + offset; if ((err_type == NO_FALSE_NEGATIVES and ub > threshold) or (err_type == NO_FALSE_POSITIVES and lb > threshold)) { items.push_back(row(&it.first, it.second, offset)); } } // sort by estimate in descending order std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); }); return items; } template void frequent_items_sketch::serialize(std::ostream& os) const { const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY; os.write((char*)&preamble_longs, sizeof(preamble_longs)); const uint8_t serial_version = SERIAL_VERSION; os.write((char*)&serial_version, sizeof(serial_version)); const uint8_t family = FAMILY_ID; os.write((char*)&family, sizeof(family)); const uint8_t lg_max_size = map.get_lg_max_size(); os.write((char*)&lg_max_size, sizeof(lg_max_size)); const uint8_t lg_cur_size = map.get_lg_cur_size(); os.write((char*)&lg_cur_size, sizeof(lg_cur_size)); const uint8_t flags_byte( (is_empty() ? 1 << flags::IS_EMPTY : 0) ); os.write((char*)&flags_byte, sizeof(flags_byte)); const uint16_t unused16 = 0; os.write((char*)&unused16, sizeof(unused16)); if (!is_empty()) { const uint32_t num_items = map.get_num_active(); os.write((char*)&num_items, sizeof(num_items)); const uint32_t unused32 = 0; os.write((char*)&unused32, sizeof(unused32)); os.write((char*)&total_weight, sizeof(total_weight)); os.write((char*)&offset, sizeof(offset)); // copy active items and their weights to use batch serialization typedef typename std::allocator_traits::template rebind_alloc AllocU64; uint64_t* weights = AllocU64().allocate(num_items); T* items = A().allocate(num_items); uint32_t i = 0; for (auto &it: map) { new (&items[i]) T(it.first); weights[i++] = it.second; } os.write((char*)weights, sizeof(uint64_t) * num_items); AllocU64().deallocate(weights, num_items); S().serialize(os, items, num_items); for (unsigned i = 0; i < num_items; i++) items[i].~T(); A().deallocate(items, num_items); } } template size_t frequent_items_sketch::get_serialized_size_bytes() const { if (is_empty()) return PREAMBLE_LONGS_EMPTY * sizeof(uint64_t); size_t size = (PREAMBLE_LONGS_NONEMPTY + map.get_num_active()) * sizeof(uint64_t); for (auto &it: map) size += S().size_of_item(it.first); return size; } template std::pair frequent_items_sketch::serialize(unsigned header_size_bytes) const { const size_t size = header_size_bytes + get_serialized_size_bytes(); typedef typename std::allocator_traits::template rebind_alloc AllocChar; void_ptr_with_deleter data_ptr( static_cast(AllocChar().allocate(size)), [size](void* ptr) { AllocChar().deallocate(static_cast(ptr), size); } ); char* ptr = static_cast(data_ptr.get()) + header_size_bytes; const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY; copy_to_mem(&preamble_longs, &ptr, sizeof(uint8_t)); const uint8_t serial_version = SERIAL_VERSION; copy_to_mem(&serial_version, &ptr, sizeof(uint8_t)); const uint8_t family = FAMILY_ID; copy_to_mem(&family, &ptr, sizeof(uint8_t)); const uint8_t lg_max_size = map.get_lg_max_size(); copy_to_mem(&lg_max_size, &ptr, sizeof(uint8_t)); const uint8_t lg_cur_size = map.get_lg_cur_size(); copy_to_mem(&lg_cur_size, &ptr, sizeof(uint8_t)); const uint8_t flags_byte( (is_empty() ? 1 << flags::IS_EMPTY : 0) ); copy_to_mem(&flags_byte, &ptr, sizeof(uint8_t)); const uint16_t unused16 = 0; copy_to_mem(&unused16, &ptr, sizeof(uint16_t)); if (!is_empty()) { const uint32_t num_items = map.get_num_active(); copy_to_mem(&num_items, &ptr, sizeof(uint32_t)); const uint32_t unused32 = 0; copy_to_mem(&unused32, &ptr, sizeof(uint32_t)); copy_to_mem(&total_weight, &ptr, sizeof(uint64_t)); copy_to_mem(&offset, &ptr, sizeof(uint64_t)); // copy active items and their weights to use batch serialization typedef typename std::allocator_traits::template rebind_alloc AllocU64; uint64_t* weights = AllocU64().allocate(num_items); T* items = A().allocate(num_items); uint32_t i = 0; for (auto &it: map) { new (&items[i]) T(it.first); weights[i++] = it.second; } copy_to_mem(weights, &ptr, sizeof(uint64_t) * num_items); AllocU64().deallocate(weights, num_items); ptr += S().serialize(ptr, items, num_items); for (unsigned i = 0; i < num_items; i++) items[i].~T(); A().deallocate(items, num_items); } return std::make_pair(std::move(data_ptr), size); } template frequent_items_sketch frequent_items_sketch::deserialize(std::istream& is) { uint8_t preamble_longs; is.read((char*)&preamble_longs, sizeof(preamble_longs)); uint8_t serial_version; is.read((char*)&serial_version, sizeof(serial_version)); uint8_t family_id; is.read((char*)&family_id, sizeof(family_id)); uint8_t lg_max_size; is.read((char*)&lg_max_size, sizeof(lg_max_size)); uint8_t lg_cur_size; is.read((char*)&lg_cur_size, sizeof(lg_cur_size)); uint8_t flags_byte; is.read((char*)&flags_byte, sizeof(flags_byte)); uint16_t unused16; is.read((char*)&unused16, sizeof(unused16)); const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); check_preamble_longs(preamble_longs, is_empty); check_serial_version(serial_version); check_family_id(family_id); check_size(lg_cur_size, lg_max_size); frequent_items_sketch sketch(lg_cur_size, lg_max_size); if (!is_empty) { uint32_t num_items; is.read((char*)&num_items, sizeof(num_items)); uint32_t unused32; is.read((char*)&unused32, sizeof(unused32)); uint64_t total_weight; is.read((char*)&total_weight, sizeof(total_weight)); uint64_t offset; is.read((char*)&offset, sizeof(offset)); // batch deserialization with intermediate array of items and weights typedef typename std::allocator_traits::template rebind_alloc AllocU64; uint64_t* weights = AllocU64().allocate(num_items); is.read((char*)weights, sizeof(uint64_t) * num_items); T* items = A().allocate(num_items); // rely on serde to construct items S().deserialize(is, items, num_items); for (uint32_t i = 0; i < num_items; i++) { sketch.update(std::move(items[i]), weights[i]); items[i].~T(); } AllocU64().deallocate(weights, num_items); A().deallocate(items, num_items); sketch.total_weight = total_weight; sketch.offset = offset; } return sketch; } template frequent_items_sketch frequent_items_sketch::deserialize(const void* bytes, size_t size) { const char* ptr = static_cast(bytes); uint8_t preamble_longs; copy_from_mem(&ptr, &preamble_longs, sizeof(uint8_t)); uint8_t serial_version; copy_from_mem(&ptr, &serial_version, sizeof(uint8_t)); uint8_t family_id; copy_from_mem(&ptr, &family_id, sizeof(uint8_t)); uint8_t lg_max_size; copy_from_mem(&ptr, &lg_max_size, sizeof(uint8_t)); uint8_t lg_cur_size; copy_from_mem(&ptr, &lg_cur_size, sizeof(uint8_t)); uint8_t flags_byte; copy_from_mem(&ptr, &flags_byte, sizeof(uint8_t)); uint16_t unused16; copy_from_mem(&ptr, &unused16, sizeof(uint16_t)); const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); check_preamble_longs(preamble_longs, is_empty); check_serial_version(serial_version); check_family_id(family_id); check_size(lg_cur_size, lg_max_size); frequent_items_sketch sketch(lg_cur_size, lg_max_size); if (!is_empty) { uint32_t num_items; copy_from_mem(&ptr, &num_items, sizeof(uint32_t)); uint32_t unused32; copy_from_mem(&ptr, &unused32, sizeof(uint32_t)); uint64_t total_weight; copy_from_mem(&ptr, &total_weight, sizeof(uint64_t)); uint64_t offset; copy_from_mem(&ptr, &offset, sizeof(uint64_t)); // batch deserialization with intermediate array of items and weights typedef typename std::allocator_traits::template rebind_alloc AllocU64; uint64_t* weights = AllocU64().allocate(num_items); copy_from_mem(&ptr, weights, sizeof(uint64_t) * num_items); T* items = A().allocate(num_items); ptr += S().deserialize(ptr, items, num_items); for (uint32_t i = 0; i < num_items; i++) { sketch.update(std::move(items[i]), weights[i]); items[i].~T(); } AllocU64().deallocate(weights, num_items); A().deallocate(items, num_items); sketch.total_weight = total_weight; sketch.offset = offset; } return sketch; } template void frequent_items_sketch::check_preamble_longs(uint8_t preamble_longs, bool is_empty) { if (is_empty) { if (preamble_longs != PREAMBLE_LONGS_EMPTY) { throw std::invalid_argument("Possible corruption: preamble longs of an empty sketch must be " + std::to_string(PREAMBLE_LONGS_EMPTY) + ": " + std::to_string(preamble_longs)); } } else { if (preamble_longs != PREAMBLE_LONGS_NONEMPTY) { throw std::invalid_argument("Possible corruption: preamble longs of an non-empty sketch must be " + std::to_string(PREAMBLE_LONGS_NONEMPTY) + ": " + std::to_string(preamble_longs)); } } } template void frequent_items_sketch::check_serial_version(uint8_t serial_version) { if (serial_version != SERIAL_VERSION) { throw std::invalid_argument("Possible corruption: serial version must be " + std::to_string(SERIAL_VERSION) + ": " + std::to_string(serial_version)); } } template void frequent_items_sketch::check_family_id(uint8_t family_id) { if (family_id != FAMILY_ID) { throw std::invalid_argument("Possible corruption: family ID must be " + std::to_string(FAMILY_ID) + ": " + std::to_string(family_id)); } } template void frequent_items_sketch::check_size(uint8_t lg_cur_size, uint8_t lg_max_size) { if (lg_cur_size > lg_max_size) { throw std::invalid_argument("Possible corruption: expected lg_cur_size <= lg_max_size: " + std::to_string(lg_cur_size) + " <= " + std::to_string(lg_max_size)); } if (lg_cur_size < LG_MIN_MAP_SIZE) { throw std::invalid_argument("Possible corruption: lg_cur_size must not be less than " + std::to_string(LG_MIN_MAP_SIZE) + ": " + std::to_string(lg_cur_size)); } } template void frequent_items_sketch::to_stream(std::ostream& os, bool print_items) const { os << "### Frequent items sketch summary:" << std::endl; os << " lg cur map size : " << (int) map.get_lg_cur_size() << std::endl; os << " lg max map size : " << (int) map.get_lg_max_size() << std::endl; os << " num active items : " << get_num_active_items() << std::endl; os << " total weight : " << get_total_weight() << std::endl; os << " max error : " << get_maximum_error() << std::endl; os << "### End sketch summary" << std::endl; if (print_items) { std::vector items; for (auto &it: map) { items.push_back(row(&it.first, it.second, offset)); } // sort by estimate in descending order std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); }); os << "### Items in descending order by estimate" << std::endl; os << " item, estimate, lower bound, upper bound" << std::endl; for (auto &it: items) { os << " " << it.get_item() << ", " << it.get_estimate() << ", " << it.get_lower_bound() << ", " << it.get_upper_bound() << std::endl; } os << "### End items" << std::endl; } } } /* namespace datasketches */ # endif