/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef KLL_QUANTILE_CALCULATOR_HPP_ #define KLL_QUANTILE_CALCULATOR_HPP_ #include #include #include #include "kll_helper.hpp" namespace datasketches { template class kll_quantile_calculator { typedef typename std::allocator_traits::template rebind_alloc AllocU32; typedef typename std::allocator_traits::template rebind_alloc AllocU64; public: // assumes that all levels are sorted including level 0 kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) { n_ = n; const uint32_t num_items = levels[num_levels] - levels[0]; items_ = A().allocate(num_items); weights_ = AllocU64().allocate(num_items + 1); // one more is intentional levels_size_ = num_levels + 1; levels_ = AllocU32().allocate(levels_size_); populate_from_sketch(items, num_items, levels, num_levels); blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_); convert_to_preceding_cummulative(weights_, num_items + 1); } ~kll_quantile_calculator() { const uint32_t num_items = levels_[num_levels_] - levels_[0]; for (uint32_t i = 0; i < num_items; i++) items_[i].~T(); A().deallocate(items_, num_items); AllocU64().deallocate(weights_, num_items + 1); AllocU32().deallocate(levels_, levels_size_); } T get_quantile(double fraction) const { return approximately_answer_positional_query(pos_of_phi(fraction, n_)); } private: uint64_t n_; T* items_; uint64_t* weights_; uint32_t* levels_; uint8_t levels_size_; uint8_t num_levels_; void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) { kll_helper::copy_construct(items, levels[0], levels[num_levels], items_, 0); uint8_t src_level = 0; uint8_t dst_level = 0; uint64_t weight = 1; uint32_t offset = levels[0]; while (src_level < num_levels) { const uint32_t from_index(levels[src_level] - offset); const uint32_t to_index(levels[src_level + 1] - offset); // exclusive if (from_index < to_index) { // skip empty levels std::fill(&weights_[from_index], &weights_[to_index], weight); levels_[dst_level] = from_index; levels_[dst_level + 1] = to_index; dst_level++; } src_level++; weight *= 2; } weights_[num_items] = 0; num_levels_ = dst_level; } T approximately_answer_positional_query(uint64_t pos) const { assert (pos < n_); const uint32_t weights_size(levels_[num_levels_] + 1); const uint32_t index = chunk_containing_pos(weights_, weights_size, pos); return items_[index]; } static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) { uint64_t subtotal(0); for (uint32_t i = 0; i < weights_size; i++) { const uint32_t new_subtotal = subtotal + weights[i]; weights[i] = subtotal; subtotal = new_subtotal; } } static uint64_t pos_of_phi(double phi, uint64_t n) { const uint64_t pos = std::floor(phi * n); return (pos == n) ? n - 1 : pos; } static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) { assert (weights_size > 1); // remember, weights_ contains an "extra" position const uint32_t nominal_length(weights_size - 1); assert (weights[0] <= pos); assert (pos < weights[nominal_length]); return search_for_chunk_containing_pos(weights, pos, 0, nominal_length); } static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) { if (l + 1 == r) { return l; } const uint32_t m(l + (r - l) / 2); if (arr[m] <= pos) { return search_for_chunk_containing_pos(arr, pos, m, r); } return search_for_chunk_containing_pos(arr, pos, l, m); } static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) { if (num_levels == 1) return; // move the input in preparation for the "ping-pong" reduction strategy auto tmp_items_deleter = [num_items](T* ptr) { for (uint32_t i = 0; i < num_items; i++) ptr[i].~T(); A().deallocate(ptr, num_items); }; std::unique_ptr tmp_items(A().allocate(num_items), tmp_items_deleter); kll_helper::move_construct(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); }; std::unique_ptr tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here std::copy(weights, &weights[num_items], tmp_weights.get()); blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels); } static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) { if (num_levels == 1) return; const uint8_t num_levels_1 = num_levels / 2; const uint8_t num_levels_2 = num_levels - num_levels_1; assert (num_levels_1 >= 1); assert (num_levels_2 >= num_levels_1); const uint8_t starting_level_1 = starting_level; const uint8_t starting_level_2 = starting_level + num_levels_1; // swap roles of src and dst blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1); blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2); tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2); } static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) { const auto from_index_1 = levels[starting_level_1]; const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive const auto from_index_2 = levels[starting_level_2]; const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive auto i_src_1 = from_index_1; auto i_src_2 = from_index_2; auto i_dst = from_index_1; while ((i_src_1 < to_index_1) and (i_src_2 < to_index_2)) { if (C()(items_src[i_src_1], items_src[i_src_2])) { items_dst[i_dst] = std::move(items_src[i_src_1]); weights_dst[i_dst] = weights_src[i_src_1]; i_src_1++; } else { items_dst[i_dst] = std::move(items_src[i_src_2]); weights_dst[i_dst] = weights_src[i_src_2]; i_src_2++; } i_dst++; } if (i_src_1 < to_index_1) { std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]); std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]); } else if (i_src_2 < to_index_2) { std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]); std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]); } } }; } /* namespace datasketches */ #endif // KLL_QUANTILE_CALCULATOR_HPP_