/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef COUNT_MIN_IMPL_HPP_ #define COUNT_MIN_IMPL_HPP_ #include #include #include #include #include "MurmurHash3.h" #include "count_min.hpp" #include "memory_operations.hpp" namespace datasketches { template count_min_sketch::count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed, const A& allocator): _allocator(allocator), _num_hashes(num_hashes), _num_buckets(num_buckets), _sketch_array((num_hashes*num_buckets < 1<<30) ? num_hashes*num_buckets : 0, 0, _allocator), _seed(seed), _total_weight(0) { if (num_buckets < 3) throw std::invalid_argument("Using fewer than 3 buckets incurs relative error greater than 1."); // This check is to ensure later compatibility with a Java implementation whose maximum size can only // be 2^31-1. We check only against 2^30 for simplicity. if (num_buckets * num_hashes >= 1 << 30) { throw std::invalid_argument("These parameters generate a sketch that exceeds 2^30 elements." "Try reducing either the number of buckets or the number of hash functions."); } std::default_random_engine rng(_seed); std::uniform_int_distribution extra_hash_seeds(0, std::numeric_limits::max()); hash_seeds.reserve(num_hashes); for (uint64_t i=0; i < num_hashes; ++i) { hash_seeds.push_back(extra_hash_seeds(rng) + _seed); // Adds the global seed to all hash functions. } } template uint8_t count_min_sketch::get_num_hashes() const { return _num_hashes; } template uint32_t count_min_sketch::get_num_buckets() const { return _num_buckets; } template uint64_t count_min_sketch::get_seed() const { return _seed; } template double count_min_sketch::get_relative_error() const { return exp(1.0) / double(_num_buckets); } template W count_min_sketch::get_total_weight() const { return _total_weight; } template uint32_t count_min_sketch::suggest_num_buckets(double relative_error) { /* * Function to help users select a number of buckets for a given error. * TODO: Change this when we use only power of 2 buckets. */ if (relative_error < 0.) { throw std::invalid_argument("Relative error must be at least 0."); } return static_cast(ceil(exp(1.0) / relative_error)); } template uint8_t count_min_sketch::suggest_num_hashes(double confidence) { /* * Function to help users select a number of hashes for a given confidence * e.g. confidence = 1 - failure probability * failure probability == delta in the literature. */ if (confidence < 0. || confidence > 1.0) { throw std::invalid_argument("Confidence must be between 0 and 1.0 (inclusive)."); } return std::min(ceil(log(1.0 / (1.0 - confidence))), UINT8_MAX); } template std::vector count_min_sketch::get_hashes(const void* item, size_t size) const { /* * Returns the hash locations for the input item using the original hashing * scheme from [1]. * Generate _num_hashes separate hashes from calls to murmurmhash. * This could be optimized by keeping both of the 64bit parts of the hash * function, rather than generating a new one for every level. * * * Postscript. * Note that a tradeoff can be achieved over the update time and space * complexity of the sketch by using a combinatorial hashing scheme from * https://github.com/Claudenw/BloomFilter/wiki/Bloom-Filters----An-overview * https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf */ uint64_t bucket_index; std::vector sketch_update_locations; sketch_update_locations.reserve(_num_hashes); uint64_t hash_seed_index = 0; for (const auto &it: hash_seeds) { HashState hashes; MurmurHash3_x64_128(item, size, it, hashes); // ? BEWARE OVERFLOW. uint64_t hash = hashes.h1; bucket_index = hash % _num_buckets; sketch_update_locations.push_back((hash_seed_index * _num_buckets) + bucket_index); hash_seed_index += 1; } return sketch_update_locations; } template W count_min_sketch::get_estimate(uint64_t item) const {return get_estimate(&item, sizeof(item));} template W count_min_sketch::get_estimate(int64_t item) const {return get_estimate(&item, sizeof(item));} template W count_min_sketch::get_estimate(const std::string& item) const { if (item.empty()) return 0; // Empty strings are not inserted into the sketch. return get_estimate(item.c_str(), item.length()); } template W count_min_sketch::get_estimate(const void* item, size_t size) const { /* * Returns the estimated frequency of the item */ std::vector hash_locations = get_hashes(item, size); std::vector estimates; for (const auto h: hash_locations) { estimates.push_back(_sketch_array[h]); } return *std::min_element(estimates.begin(), estimates.end()); } template void count_min_sketch::update(uint64_t item, W weight) { update(&item, sizeof(item), weight); } template void count_min_sketch::update(int64_t item, W weight) { update(&item, sizeof(item), weight); } template void count_min_sketch::update(const std::string& item, W weight) { if (item.empty()) return; update(item.c_str(), item.length(), weight); } template void count_min_sketch::update(const void* item, size_t size, W weight) { /* * Gets the item's hash locations and then increments the sketch in those * locations by the weight. */ _total_weight += weight >= 0 ? weight : -weight; std::vector hash_locations = get_hashes(item, size); for (const auto h: hash_locations) { _sketch_array[h] += weight; } } template W count_min_sketch::get_upper_bound(uint64_t item) const {return get_upper_bound(&item, sizeof(item));} template W count_min_sketch::get_upper_bound(int64_t item) const {return get_upper_bound(&item, sizeof(item));} template W count_min_sketch::get_upper_bound(const std::string& item) const { if (item.empty()) return 0; // Empty strings are not inserted into the sketch. return get_upper_bound(item.c_str(), item.length()); } template W count_min_sketch::get_upper_bound(const void* item, size_t size) const { return static_cast(get_estimate(item, size) + get_relative_error() * get_total_weight()); } template W count_min_sketch::get_lower_bound(uint64_t item) const {return get_lower_bound(&item, sizeof(item));} template W count_min_sketch::get_lower_bound(int64_t item) const {return get_lower_bound(&item, sizeof(item));} template W count_min_sketch::get_lower_bound(const std::string& item) const { if (item.empty()) return 0; // Empty strings are not inserted into the sketch. return get_lower_bound(item.c_str(), item.length()); } template W count_min_sketch::get_lower_bound(const void* item, size_t size) const { return get_estimate(item, size); } template void count_min_sketch::merge(const count_min_sketch &other_sketch) { /* * Merges this sketch into other_sketch sketch by elementwise summing of buckets */ if (this == &other_sketch) { throw std::invalid_argument( "Cannot merge a sketch with itself." ); } bool acceptable_config = (get_num_hashes() == other_sketch.get_num_hashes()) && (get_num_buckets() == other_sketch.get_num_buckets()) && (get_seed() == other_sketch.get_seed()); if (!acceptable_config) { throw std::invalid_argument( "Incompatible sketch configuration." ); } // Merge step - iterate over the other vector and add the weights to this sketch auto it = _sketch_array.begin(); // This is a std::vector iterator. auto other_it = other_sketch.begin(); //This is a const iterator over the other sketch. while (it != _sketch_array.end()) { *it += *other_it; ++it; ++other_it; } _total_weight += other_sketch.get_total_weight(); } // Iterators template typename count_min_sketch::const_iterator count_min_sketch::begin() const { return _sketch_array.begin(); } template typename count_min_sketch::const_iterator count_min_sketch::end() const { return _sketch_array.end(); } template void count_min_sketch::serialize(std::ostream& os) const { // Long 0 //const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL; const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT; const uint8_t ser_ver = SERIAL_VERSION_1; const uint8_t family_id = FAMILY_ID; const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0); const uint32_t unused32 = NULL_32; write(os, preamble_longs); write(os, ser_ver); write(os, family_id); write(os, flags_byte); write(os, unused32); // Long 1 const uint32_t nbuckets = _num_buckets; const uint8_t nhashes = _num_hashes; const uint16_t seed_hash(compute_seed_hash(_seed)); const uint8_t unused8 = NULL_8; write(os, nbuckets); write(os, nhashes); write(os, seed_hash); write(os, unused8); if (is_empty()) return; // sketch is empty, no need to write further bytes. // Long 2 write(os, _total_weight); // Long 3 onwards: remaining bytes are consumed by writing the weight and the array values. auto it = _sketch_array.begin(); while (it != _sketch_array.end()) { write(os, *it); ++it; } } template auto count_min_sketch::deserialize(std::istream& is, uint64_t seed, const A& allocator) -> count_min_sketch { // First 8 bytes are 4 bytes of preamble and 4 unused bytes. const auto preamble_longs = read(is); const auto serial_version = read(is); const auto family_id = read(is); const auto flags_byte = read(is); read(is); // 4 unused bytes check_header_validity(preamble_longs, serial_version, family_id, flags_byte); // Sketch parameters const auto nbuckets = read(is); const auto nhashes = read(is); const auto seed_hash = read(is); read(is); // 1 unused byte if (seed_hash != compute_seed_hash(seed)) { throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", " + std::to_string(compute_seed_hash(seed))); } count_min_sketch c(nhashes, nbuckets, seed, allocator); const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0; if (is_empty == 1) return c; // sketch is empty, no need to read further. // Set the sketch weight and read in the sketch values const auto weight = read(is); c._total_weight += weight; read(is, c._sketch_array.data(), sizeof(W) * c._sketch_array.size()); return c; } template size_t count_min_sketch::get_serialized_size_bytes() const { // The header is always 2 longs, whether empty or full const size_t preamble_longs = PREAMBLE_LONGS_SHORT; // If the sketch is empty, we're done. Otherwise, we need the total weight // held by the sketch as well as a data table of size (num_buckets * num_hashes) return (preamble_longs * sizeof(uint64_t)) + (is_empty() ? 0 : sizeof(W) * (1 + _num_buckets * _num_hashes)); } template auto count_min_sketch::serialize(unsigned header_size_bytes) const -> vector_bytes { vector_bytes bytes(header_size_bytes + get_serialized_size_bytes(), 0, _allocator); uint8_t *ptr = bytes.data() + header_size_bytes; // Long 0 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT; ptr += copy_to_mem(preamble_longs, ptr); const uint8_t ser_ver = SERIAL_VERSION_1; ptr += copy_to_mem(ser_ver, ptr); const uint8_t family_id = FAMILY_ID; ptr += copy_to_mem(family_id, ptr); const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0); ptr += copy_to_mem(flags_byte, ptr); const uint32_t unused32 = NULL_32; ptr += copy_to_mem(unused32, ptr); // Long 1 const uint32_t nbuckets = _num_buckets; const uint8_t nhashes = _num_hashes; const uint16_t seed_hash(compute_seed_hash(_seed)); const uint8_t null_characters_8 = NULL_8; ptr += copy_to_mem(nbuckets, ptr); ptr += copy_to_mem(nhashes, ptr); ptr += copy_to_mem(seed_hash, ptr); ptr += copy_to_mem(null_characters_8, ptr); if (is_empty()) return bytes; // sketch is empty, no need to write further bytes. // Long 2 const W t_weight = _total_weight; ptr += copy_to_mem(t_weight, ptr); // Long 3 onwards: remaining bytes are consumed by writing the weight and the array values. auto it = _sketch_array.begin(); while (it != _sketch_array.end()) { ptr += copy_to_mem(*it, ptr); ++it; } return bytes; } template auto count_min_sketch::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) -> count_min_sketch { ensure_minimum_memory(size, PREAMBLE_LONGS_SHORT * sizeof(uint64_t)); const char* ptr = static_cast(bytes); // First 8 bytes are 4 bytes of preamble and 4 unused bytes. uint8_t preamble_longs; ptr += copy_from_mem(ptr, preamble_longs); uint8_t serial_version; ptr += copy_from_mem(ptr, serial_version); uint8_t family_id; ptr += copy_from_mem(ptr, family_id); uint8_t flags_byte; ptr += copy_from_mem(ptr, flags_byte); ptr += sizeof(uint32_t); check_header_validity(preamble_longs, serial_version, family_id, flags_byte); // Second 8 bytes are the sketch parameters with a final, unused byte. uint32_t nbuckets; uint8_t nhashes; uint16_t seed_hash; ptr += copy_from_mem(ptr, nbuckets); ptr += copy_from_mem(ptr, nhashes); ptr += copy_from_mem(ptr, seed_hash); ptr += sizeof(uint8_t); if (seed_hash != compute_seed_hash(seed)) { throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", " + std::to_string(compute_seed_hash(seed))); } count_min_sketch c(nhashes, nbuckets, seed, allocator); const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0; if (is_empty) return c; // sketch is empty, no need to read further. ensure_minimum_memory(size, sizeof(W) * (1 + nbuckets * nhashes)); // Long 2 is the weight. W weight; ptr += copy_from_mem(ptr, weight); c._total_weight += weight; // All remaining bytes are the sketch table entries. for (size_t i = 0; i bool count_min_sketch::is_empty() const { return _total_weight == 0; } template string count_min_sketch::to_string() const { // count the number of used entries in the sketch uint64_t num_nonzero = 0; for (const auto entry: _sketch_array) { if (entry != static_cast(0.0)) ++num_nonzero; } // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements. // The stream does not support passing an allocator instance, and alternatives are complicated. std::ostringstream os; os << "### Count Min sketch summary:" << std::endl; os << " num hashes : " << static_cast(_num_hashes) << std::endl; os << " num buckets : " << _num_buckets << std::endl; os << " capacity bins : " << _sketch_array.size() << std::endl; os << " filled bins : " << num_nonzero << std::endl; os << " pct filled : " << std::setprecision(3) << (num_nonzero * 100.0) / _sketch_array.size() << "%" << std::endl; os << "### End sketch summary" << std::endl; return string(os.str().c_str(), _allocator); } template void count_min_sketch::check_header_validity(uint8_t preamble_longs, uint8_t serial_version, uint8_t family_id, uint8_t flags_byte) { const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0; const uint8_t sw = (empty ? 1 : 0) + (2 * serial_version) + (4 * family_id) + (32 * (preamble_longs & 0x3F)); bool valid = true; switch (sw) { // exhaustive list and description of all valid cases case 138 : break; // !empty, ser_ver==1, family==18, preLongs=2; case 139 : break; // empty, ser_ver==1, family==18, preLongs=2; //case 170 : break; // !empty, ser_ver==1, family==18, preLongs=3; default : // all other case values are invalid valid = false; } if (!valid) { std::ostringstream os; os << "Possible sketch corruption. Inconsistent state: " << "preamble_longs = " << static_cast(preamble_longs) << ", empty = " << (empty ? "true" : "false") << ", serialization_version = " << static_cast(serial_version); throw std::invalid_argument(os.str()); } } } /* namespace datasketches */ #endif