/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef CPC_UNION_IMPL_HPP_ #define CPC_UNION_IMPL_HPP_ #include "count_zeros.hpp" #include namespace datasketches { template cpc_union_alloc::cpc_union_alloc(uint8_t lg_k, uint64_t seed, const A& allocator): lg_k(lg_k), seed(seed), accumulator(nullptr), bit_matrix(allocator) { if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) { throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= " + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k)); } accumulator = new (AllocCpc(allocator).allocate(1)) cpc_sketch_alloc(lg_k, seed, allocator); } template cpc_union_alloc::cpc_union_alloc(const cpc_union_alloc& other): lg_k(other.lg_k), seed(other.seed), accumulator(other.accumulator), bit_matrix(other.bit_matrix) { if (accumulator != nullptr) { accumulator = new (AllocCpc(accumulator->get_allocator()).allocate(1)) cpc_sketch_alloc(*other.accumulator); } } template cpc_union_alloc::cpc_union_alloc(cpc_union_alloc&& other) noexcept: lg_k(other.lg_k), seed(other.seed), accumulator(other.accumulator), bit_matrix(std::move(other.bit_matrix)) { other.accumulator = nullptr; } template cpc_union_alloc::~cpc_union_alloc() { if (accumulator != nullptr) { AllocCpc allocator(accumulator->get_allocator()); accumulator->~cpc_sketch_alloc(); allocator.deallocate(accumulator, 1); } } template cpc_union_alloc& cpc_union_alloc::operator=(const cpc_union_alloc& other) { cpc_union_alloc copy(other); std::swap(lg_k, copy.lg_k); seed = copy.seed; std::swap(accumulator, copy.accumulator); bit_matrix = std::move(copy.bit_matrix); return *this; } template cpc_union_alloc& cpc_union_alloc::operator=(cpc_union_alloc&& other) noexcept { std::swap(lg_k, other.lg_k); seed = other.seed; std::swap(accumulator, other.accumulator); bit_matrix = std::move(other.bit_matrix); return *this; } template void cpc_union_alloc::update(const cpc_sketch_alloc& sketch) { internal_update(sketch); } template void cpc_union_alloc::update(cpc_sketch_alloc&& sketch) { internal_update(std::forward>(sketch)); } template template void cpc_union_alloc::internal_update(S&& sketch) { const uint16_t seed_hash_union = compute_seed_hash(seed); const uint16_t seed_hash_sketch = compute_seed_hash(sketch.seed); if (seed_hash_union != seed_hash_sketch) { throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash_union) + ", " + std::to_string(seed_hash_sketch)); } const auto src_flavor = sketch.determine_flavor(); if (cpc_sketch_alloc::flavor::EMPTY == src_flavor) return; if (sketch.get_lg_k() < lg_k) reduce_k(sketch.get_lg_k()); if (sketch.get_lg_k() < lg_k) throw std::logic_error("sketch lg_k < union lg_k"); if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit matrix are absent"); if (cpc_sketch_alloc::flavor::SPARSE == src_flavor && accumulator != nullptr) { // Case A if (bit_matrix.size() > 0) throw std::logic_error("union bit_matrix is not expected"); const auto initial_dest_flavor = accumulator->determine_flavor(); if (cpc_sketch_alloc::flavor::EMPTY != initial_dest_flavor && cpc_sketch_alloc::flavor::SPARSE != initial_dest_flavor) throw std::logic_error("wrong flavor"); // The following partially fixes the snowplow problem provided that the K's are equal. if (cpc_sketch_alloc::flavor::EMPTY == initial_dest_flavor && lg_k == sketch.get_lg_k()) { *accumulator = std::forward(sketch); return; } walk_table_updating_sketch(sketch.surprising_value_table); const auto final_dst_flavor = accumulator->determine_flavor(); // if the accumulator has graduated beyond sparse, switch to a bit matrix representation if (final_dst_flavor != cpc_sketch_alloc::flavor::EMPTY && final_dst_flavor != cpc_sketch_alloc::flavor::SPARSE) { switch_to_bit_matrix(); } return; } if (cpc_sketch_alloc::flavor::SPARSE == src_flavor && bit_matrix.size() > 0) { // Case B if (accumulator != nullptr) throw std::logic_error("union accumulator != null"); or_table_into_matrix(sketch.surprising_value_table); return; } if (cpc_sketch_alloc::flavor::HYBRID != src_flavor && cpc_sketch_alloc::flavor::PINNED != src_flavor && cpc_sketch_alloc::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // source is past SPARSE mode, so make sure that dest is a bit matrix if (accumulator != nullptr) { if (bit_matrix.size() > 0) throw std::logic_error("union bit matrix is not expected"); const auto dst_flavor = accumulator->determine_flavor(); if (cpc_sketch_alloc::flavor::EMPTY != dst_flavor && cpc_sketch_alloc::flavor::SPARSE != dst_flavor) { throw std::logic_error("wrong flavor"); } switch_to_bit_matrix(); } if (bit_matrix.size() == 0) throw std::logic_error("union bit_matrix is expected"); if (cpc_sketch_alloc::flavor::HYBRID == src_flavor || cpc_sketch_alloc::flavor::PINNED == src_flavor) { // Case C or_window_into_matrix(sketch.sliding_window, sketch.window_offset, sketch.get_lg_k()); or_table_into_matrix(sketch.surprising_value_table); return; } // SLIDING mode involves inverted logic, so we can't just walk the source sketch. // Instead, we convert it to a bitMatrix that can be OR'ed into the destination. if (cpc_sketch_alloc::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // Case D vector_u64 src_matrix = sketch.build_bit_matrix(); or_matrix_into_matrix(src_matrix, sketch.get_lg_k()); } template cpc_sketch_alloc cpc_union_alloc::get_result() const { if (accumulator != nullptr) { if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected"); return get_result_from_accumulator(); } if (bit_matrix.size() == 0) throw std::logic_error("bit_matrix is expected"); return get_result_from_bit_matrix(); } template cpc_sketch_alloc cpc_union_alloc::get_result_from_accumulator() const { if (lg_k != accumulator->get_lg_k()) throw std::logic_error("lg_k != accumulator->lg_k"); if (accumulator->get_num_coupons() == 0) { return cpc_sketch_alloc(lg_k, seed, accumulator->get_allocator()); } if (accumulator->determine_flavor() != cpc_sketch_alloc::flavor::SPARSE) throw std::logic_error("wrong flavor"); cpc_sketch_alloc copy(*accumulator); copy.was_merged = true; return copy; } template cpc_sketch_alloc cpc_union_alloc::get_result_from_bit_matrix() const { const uint32_t k = 1 << lg_k; const uint32_t num_coupons = count_bits_set_in_matrix(bit_matrix.data(), k); const auto flavor = cpc_sketch_alloc::determine_flavor(lg_k, num_coupons); if (flavor != cpc_sketch_alloc::flavor::HYBRID && flavor != cpc_sketch_alloc::flavor::PINNED && flavor != cpc_sketch_alloc::flavor::SLIDING) throw std::logic_error("wrong flavor"); const uint8_t offset = cpc_sketch_alloc::determine_correct_offset(lg_k, num_coupons); vector_bytes sliding_window(k, 0, bit_matrix.get_allocator()); // don't need to zero the window's memory // dynamically growing caused snowplow effect uint8_t table_lg_size = lg_k - 4; // K/16; in some cases this will end up being oversized if (table_lg_size < 2) table_lg_size = 2; u32_table table(table_lg_size, 6 + lg_k, bit_matrix.get_allocator()); // the following should work even when the offset is zero const uint64_t mask_for_clearing_window = (static_cast(0xff) << offset) ^ UINT64_MAX; const uint64_t mask_for_flipping_early_zone = (static_cast(1) << offset) - 1; uint64_t all_surprises_ored = 0; // The snowplow effect was caused by processing the rows in order, // but we have fixed it by using a sufficiently large hash table. for (uint32_t i = 0; i < k; i++) { uint64_t pattern = bit_matrix[i]; sliding_window[i] = (pattern >> offset) & 0xff; pattern &= mask_for_clearing_window; pattern ^= mask_for_flipping_early_zone; // this flipping converts surprising 0's to 1's all_surprises_ored |= pattern; while (pattern != 0) { const uint8_t col = count_trailing_zeros_in_u64(pattern); pattern = pattern ^ (static_cast(1) << col); // erase the 1 const uint32_t row_col = (i << 6) | col; bool is_novel = table.maybe_insert(row_col); if (!is_novel) throw std::logic_error("is_novel != true"); } } // at this point we could shrink an oversized hash table, but the relative waste isn't very big uint8_t first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored); if (first_interesting_column > offset) first_interesting_column = offset; // corner case // HIP-related fields will contain zeros, and that is okay return cpc_sketch_alloc(lg_k, num_coupons, first_interesting_column, std::move(table), std::move(sliding_window), false, 0, 0, seed); } template void cpc_union_alloc::switch_to_bit_matrix() { bit_matrix = accumulator->build_bit_matrix(); AllocCpc allocator(accumulator->get_allocator()); accumulator->~cpc_sketch_alloc(); allocator.deallocate(accumulator, 1); accumulator = nullptr; } template void cpc_union_alloc::walk_table_updating_sketch(const u32_table& table) { const uint32_t* slots = table.get_slots(); const uint32_t num_slots = 1 << table.get_lg_size(); const uint64_t dst_mask = (((1 << accumulator->get_lg_k()) - 1) << 6) | 63; // downsamples when dst lgK < src LgK // Using a golden ratio stride fixes the snowplow effect. const double golden = 0.6180339887498949025; uint32_t stride = static_cast(golden * static_cast(num_slots)); if (stride < 2) throw std::logic_error("stride < 2"); if (stride == ((stride >> 1) << 1)) stride += 1; // force the stride to be odd if (stride < 3 || stride >= num_slots) throw std::out_of_range("stride out of range"); for (uint32_t i = 0, j = 0; i < num_slots; i++, j += stride) { j &= num_slots - 1; const uint32_t row_col = slots[j]; if (row_col != UINT32_MAX) { accumulator->row_col_update(row_col & dst_mask); } } } template void cpc_union_alloc::or_table_into_matrix(const u32_table& table) { const uint32_t* slots = table.get_slots(); const uint32_t num_slots = 1 << table.get_lg_size(); const uint64_t dest_mask = (1 << lg_k) - 1; // downsamples when dst lgK < sr LgK for (uint32_t i = 0; i < num_slots; i++) { const uint32_t row_col = slots[i]; if (row_col != UINT32_MAX) { const uint8_t col = row_col & 63; const uint32_t row = row_col >> 6; bit_matrix[row & dest_mask] |= static_cast(1) << col; // set the bit } } } template void cpc_union_alloc::or_window_into_matrix(const vector_bytes& sliding_window, uint8_t offset, uint8_t src_lg_k) { if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK"); const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK const uint32_t src_k = 1 << src_lg_k; for (uint32_t src_row = 0; src_row < src_k; src_row++) { bit_matrix[src_row & dst_mask] |= static_cast(sliding_window[src_row]) << offset; } } template void cpc_union_alloc::or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k) { if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK"); const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK const uint32_t src_k = 1 << src_lg_k; for (uint32_t src_row = 0; src_row < src_k; src_row++) { bit_matrix[src_row & dst_mask] |= src_matrix[src_row]; } } template void cpc_union_alloc::reduce_k(uint8_t new_lg_k) { if (new_lg_k >= lg_k) throw std::logic_error("new LgK >= union lgK"); if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit_matrix are absent"); if (bit_matrix.size() > 0) { // downsample the unioner's bit matrix if (accumulator != nullptr) throw std::logic_error("accumulator is not null"); vector_u64 old_matrix = std::move(bit_matrix); const uint8_t old_lg_k = lg_k; const uint32_t new_k = 1 << new_lg_k; bit_matrix = vector_u64(new_k, 0, old_matrix.get_allocator()); lg_k = new_lg_k; or_matrix_into_matrix(old_matrix, old_lg_k); return; } if (accumulator != nullptr) { // downsample the unioner's sketch if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected"); if (!accumulator->is_empty()) { cpc_sketch_alloc old_accumulator(*accumulator); *accumulator = cpc_sketch_alloc(new_lg_k, seed, old_accumulator.get_allocator()); walk_table_updating_sketch(old_accumulator.surprising_value_table); } lg_k = new_lg_k; const auto final_new_flavor = accumulator->determine_flavor(); // if the new sketch has graduated beyond sparse, convert to bit_matrix if (final_new_flavor != cpc_sketch_alloc::flavor::EMPTY && final_new_flavor != cpc_sketch_alloc::flavor::SPARSE) { switch_to_bit_matrix(); } return; } throw std::logic_error("invalid state"); } } /* namespace datasketches */ #endif