#include
#include
#include
#include
#include
#include "postgres.h"
#include "libpq/md5.h"
#include "hyperloglog.h"
/* we're using md5, which produces 16B (128-bit) values */
#define HASH_LENGTH 16
/* Alpha constants, for various numbers of 'b'.
*
* According to hyperloglog_create the 'b' values are between 4 and 16,
* so the array has 16 non-zero items matching indexes 4, 5, ..., 16.
* This makes it very easy to access the constants.
*/
static float alpha[] = {0, 0, 0, 0, 0.673, 0.697, 0.709, 0.7153, 0.7183, 0.7198, 0.7205,
0.7209, 0.7211, 0.7212, 0.7213, 0.7213, 0.7213};
int hyperloglog_get_min_bit(const unsigned char * buffer, int byteFrom, int bytes);
int hyperloglog_get_r(const unsigned char * buffer, int byteFrom, int bytes);
int hyperloglog_estimate(HyperLogLogCounter hloglog);
void hyperloglog_add_hash(HyperLogLogCounter hloglog, const unsigned char * hash);
void hyperloglog_reset_internal(HyperLogLogCounter hloglog);
/* Allocate HLL estimator that can handle the desired cartinality and precision.
*
* TODO The ndistinct is not currently used to determine size of the bin (number of
* bits used to store the counter) - it's always 1B=8bits, for now, to make it
* easier to work with. See the header file (hyperloglog.h) for discussion of this.
*
* parameters:
* ndistinct - cardinality the estimator should handle
* error - requested error rate (0 - 1, where 0 means 'exact')
*
* returns:
* instance of HLL estimator (throws ERROR in case of failure)
*/
HyperLogLogCounter hyperloglog_create(int64 ndistinct, float error) {
float m;
size_t length = hyperloglog_get_size(ndistinct, error);
/* the bitmap is allocated as part of this memory block (-1 as one bin is already in) */
HyperLogLogCounter p = (HyperLogLogCounter)palloc(length);
/* target error rate needs to be between 0 and 1 */
if (error <= 0 || error >= 1)
elog(ERROR, "invalid error rate requested - only values in (0,1) allowed");
/* what is the minimum number of bins to achieve the requested error rate? we'll
* increase this to the nearest power of two later */
m = 1.04 / (error * error);
/* so how many bits do we need to index the bins (nearest power of two) */
p->b = (int)ceil(log2(m));
/* TODO Is there actually a good reason to limit the number precision to 16 bits? We're
* using MD5, so we have 128 bits available ... It'll require more memory - 16 bits is 65k
* bins, requiring 65kB of memory, which indeed is a lot. But why not to allow that if
* that's what was requested? */
if (p->b < 4) /* we want at least 2^4 (=16) bins */
p->b = 4;
else if (p->b > 16)
elog(ERROR, "number of index bits exceeds 16 (requested %d)", p->b);
p->m= (int)pow(2, p->b);
memset(p->data, 0, p->m);
/* use 1B for a counter by default */
p->binbits = 8;
SET_VARSIZE(p, length);
return p;
}
/* Performs a simple 'copy' of the counter, i.e. allocates a new counter and copies
* the state from the supplied one. */
HyperLogLogCounter hyperloglog_copy(HyperLogLogCounter counter) {
size_t length = VARSIZE(counter);
HyperLogLogCounter copy = (HyperLogLogCounter)palloc(length);
memcpy(copy, counter, length);
return copy;
}
/* Merges the two estimators. Either modifies the first estimator in place (inplace=true),
* or creates a new copy and returns that (inplace=false). Modification in place is very
* handy in aggregates, when we really want to modify the aggregate state in place.
*
* Mering is only possible if the counters share the same parameters (number of bins,
* bin size, ...). If the counters don't match, this throws an ERROR. */
HyperLogLogCounter hyperloglog_merge(HyperLogLogCounter counter1, HyperLogLogCounter counter2, bool inplace) {
int i;
HyperLogLogCounter result;
/* check compatibility first */
if (counter1->length != counter2->length)
elog(ERROR, "sizes of estimators differs (%d != %d)", counter1->length, counter2->length);
else if (counter1->b != counter2->b)
elog(ERROR, "index size of estimators differs (%d != %d)", counter1->b, counter2->b);
else if (counter1->m != counter2->m)
elog(ERROR, "bin count of estimators differs (%d != %d)", counter1->m, counter2->m);
else if (counter1->binbits != counter2->binbits)
elog(ERROR, "bin size of estimators differs (%d != %d)", counter1->binbits, counter2->binbits);
/* shall we create a new estimator, or merge into counter1 */
if (! inplace)
result = hyperloglog_copy(counter1);
else
result = counter1;
/* copy the state of the estimator */
for (i = 0; i < result->m; i++)
result->data[i] = (result->data[i] > counter2->data[i]) ? result->data[i] : counter2->data[i];
return result;
}
/* Computes size of the structure, depending on the requested error rate.
*
* TODO The ndistinct is not currently used to determine size of the bin.
*/
int hyperloglog_get_size(int64 ndistinct, float error) {
int b;
float m;
if (error <= 0 || error >= 1)
elog(ERROR, "invalid error rate requested");
m = 1.04 / (error * error);
b = (int)ceil(log2(m));
if (b < 4)
b = 4;
else if (b > 16)
elog(ERROR, "number of bits in HyperLogLog exceeds 16");
return offsetof(HyperLogLogCounterData,data) + (int)pow(2, b);
}
/* searches for the leftmost 1 (aka 'rho' in the algorithm) */
int hyperloglog_get_min_bit(const unsigned char * buffer, int bitfrom, int nbits) {
int b = 0;
int byteIdx = 0;
int bitIdx = 0;
for (b = bitfrom; b < nbits; b++) {
byteIdx = b / 8;
bitIdx = b % 8;
if ((buffer[byteIdx] & (0x1 << bitIdx)) != 0)
return (b - bitfrom + 1);
}
return (nbits-bitfrom) + 1;
}
/*
* Computes the HLL estimate, as described in the paper.
*
* In short it does these steps:
*
* 1) sums the data in counters (1/2^m[i])
* 2) computes the raw estimate E
* 3) corrects the estimate for low/high values
*
*/
int hyperloglog_estimate(HyperLogLogCounter hloglog) {
double sum = 0, E = 0;
int j;
/* compute the sum for the indicator function */
for (j = 0; j < hloglog->m; j++)
sum += (1.0 / pow(2, hloglog->data[j]));
/* and finally the estimate itself */
E = alpha[hloglog->b] * pow(hloglog->m, 2) / sum;
if (E <= (5.0 * hloglog->m / 2)) {
int V = 0;
for (j = 0; j < hloglog->m; j++)
if (hloglog->data[j] == 0)
V += 1;
if (V != 0)
E = hloglog->m * log(hloglog->m / (float)V);
} else if (E > pow(2, 32) / 30) {
E = -pow(2,32) * log(1 - E / pow(2,32));
}
return (int)E;
}
void hyperloglog_add_element(HyperLogLogCounter hloglog, const char * element, int elen) {
/* get the hash */
unsigned char hash[HASH_LENGTH];
/* compute the hash using the salt */
pg_md5_binary(element, elen, hash);
/* add the hash to the estimator */
hyperloglog_add_hash(hloglog, hash);
}
/*
* Adds a 32-bit hash (computed from the actual item) into the counter. First it computes
* the counter index from the first 32 bits of the hash, then uses the remaining data to
* compute the value of the counter (aka 'rho').
*
* This assumes the hash is at least 12B of data (96b = 32b + 64b), as it supports 1B
* counters / rho values returning values up to 64 (which could fit into less than 8b,
* but whatever).
*
* However we're internally using MD5, which produces 128b (16B), so this is OK. Using
* a shorter hash values (say CRC32 producing 32b values) would be possible too, but it
* would require some tweaks.
*
* Important note - it's essential to keep the counter index / rho independent, otherwise
* the estimates will be much worse than the requested error rate. For example the 'rho'
* was originally computed like this:
*
* rho = hyperloglog_get_min_bit(&hash[4], hloglog->b, 64);
*
* which is not independent with the index because of how the shifts work. So the current
* implementation simply uses the next 4B for rho.
*/
void hyperloglog_add_hash(HyperLogLogCounter hloglog, const unsigned char * hash) {
/* get the hash */
unsigned int idx;
char rho;
/* which stream is this (keep only the first 'b' bits) */
memcpy(&idx, hash, sizeof(int));
idx = idx >> (32 - hloglog->b);
/* get the min bit (but skip the bits used for stream index) */
/* needs to be independent from 'idx' */
rho = hyperloglog_get_min_bit(&hash[4], 0, 64); /* 64-bit hash */
/* keep the highest value */
hloglog->data[idx] = (rho > (hloglog->data[idx])) ? rho : hloglog->data[idx];
}
/* Just reset the counter (set all the counters to 0). */
void hyperloglog_reset_internal(HyperLogLogCounter hloglog) {
memset(hloglog->data, 0, hloglog->m);
}