Vectorized Operations (VOPS)

Motivation

PostgreSQL looks very competitive with other mainstream databases on OLTP workload (execution of large number of simple queries). But on OLAP queries, requiring processing of larger volumes of data, DBMS-es oriented on analytic queries processing can provide an order of magnitude better speed. Let's investigate why it happen and can we do something to make PostgreSQL efficient also for OLAP queries.

Where DBMS spent most of the time during processing OLAP queries?

Profiling execution of queries shows several main factors, limiting Postgres performance:

  1. Unpacking tuple overhead (tuple_deform). To be able to access column values, Postgres needs to deform the tuple. Values can be compressed, stored at some other page (TOAST), ... Also, as far as size of column can be varying, to extract N-th column we need to unpack preceding N-1 columns. So deforming tuple is quite expensive operation, especially for tables with large number of attributes. In some cases rearranging columns in the table allows to significantly reduce query execution time. Another universal approach is to split table into two: one small with frequently accessed scalar columns and another with rarely used large columns. Certainly in this case we need to perform extra join but it allows to several times reduce amount of fetched data. In queries like TPC-H Q6, tuple deform takes about 40% of total query execution time.
  2. Interpretation overhead. Postgres compiler and optimizer build tree representing query execution plan. So query executor performs recursive invocation of evaluate functions for nodes of this tree. Implementation of some nodes also contain switches used to select requested action. So query plan is interpreted by Postgres query executor rather than directly executed. Usually interpreter is about 10 times slower than native code. This is why elimination of interpretation overhead allows to several times increase query speed, especially for queries with complex predicates where most time is spent in expression evaluation.
  3. Abstraction penalty. Support of abstract (user defined) types and operations is one of the key features of Postgres. It's executor is able to deal not only with built-in set of scalar types (like integer, real, ...) but with any types defined by user (for example complex, point,...). But the price of such flexibility is that each operations requires function call. Instead of adding to integers directly, Postgres executor invokes function which performs addition of two integers. Certainly in this case function call overhead is much larger then performed operation itself. Function call overhead is also increased because of Postgres function call convention requiring passing parameter values through memory (not using register call convention).
  4. Pull model overhead. Postgres executor is implementing classical Volcano-style query execution model - pull model. Operand's values are pulled by operator. It simplifies executor and operators implementation. But it has negative impact on performance, because leave nodes (fetching tuple from heap or index page) have to do a lot of extra work saving and restoring their context.
  5. MVCC overhead. Postgres provides multiversion concurrency control, which allows multiple transactions to concurrently work with the same record without blocking each other. It is goods for frequently updated data (OLTP), but for read-only or append-only data in OLAP scenarios it adds just extra overhead. Both space overhead (about 20 extra bytes per tuple) and CPU overhead (checking visibility of each tuple).

There are many different ways of addressing this issues. For example we can use JIT (Just-In-Time) compiler to generate native code for query and eliminate interpretation overhead and increase heap deform speed. We can rewrite optimizer from pull to push model. We can try to optimize tuple format to make heap deforming more efficient. Or we can generate byte code for query execution plan which interpretation is more efficient than recursive invocation of evaluate function for each node because of better access locality. But all this approaches require significant rewriting of Postgres executor and some of them also require changes of all Postgres architecture.

But there is an approach which allows to address most of this issues without radical changes of executor. It is vector operations. It is explained in next section.

Vertical storage

Traditional query executor (like Postgres executor) deals with single row of data at each moment of time. If it has to evaluate expression (x+y) then it fetches value of "x", then value of "y", performs operation "+" and returns the result value to the upper node. In contrast vectorized executor is able to process in one operation multiple values. In this case "x" and "y" represent not just a single scalar value, but vector of values and result is also a vector of values. In vector execution model interpretation and function call overhead is divided by size of vector. The price of performing function call is the same, but as far as function proceeds N values instead of just one, this overhead become less critical.

What is the optimal size for the vector? From the explanation above it is clear that the larger vector is, the less per-row overhead we have. So we can form vector from all values of the correspondent table attribute. It is so called vertical data model or columnar store. Unlike classical "horizontal" data model where the unit of storing data is row (tuple), here we have vertical columns. Columnar store has the following main advantages:

There are several DBMS-es implementing columnar store model. Most popular are Vertical, MonetDB. But actually performing operation on the whole column is not so good idea. Table can be very large (OLAP queries are used to work with large data sets), so vector can also be very big and even doesn't fit in memory. But even if it fits in memory, working with such larger vectors prevent efficient utilization of CPU caches (L1, L2,...). Consider expression (x+y)*(x-y). Vector executor performs addition of two vectors : "x" and "y" and produces result vector "r1". But when last element of vector "r" is produced, first elements of vector "r1" are already thrown from CPU cache, as well as first elements of "x" and "y" vectors. So when we need to calculate (x-y) we once again have to load data for "x" and "y" from slow memory to fast cache. Then we produce "r2" and perform multiplication of "r1" and "r2". But here we also need first to load data for this vectors into the CPU cache.

So it is more efficient to split column into relatively small chunks (or tiles - there is no single notion for it accepted by everyone). This chunk is a unit of processing by vectorized executor. Size of such chunk is chosen to keep all operands of vector operations in cache even for complex expressions. Typical size of chunk is from 100 to 1000 elements. So in case of (x+y)*(x-y) expression, we calculate it not for the whole column but only for 100 values (assume that size of the chunk is 100). Splitting columns into chunks in successors of MonetDB x100 and HyPer allows to increase speed up to ten times.

VOPS

Overview

There are several attempts to integrate columnar store in PostgreSQL. The most known is CStore FDW by CitusDB. It is implemented as foreign data wrapper (FDW) and is efficient for queries fetching relatively small fraction of columns. But it is using standard Postgres raw-based executor and so is not able to take advantages of vector processing. There is interesting project done by CitusDB intern. He implements vector operations on top of CStore using executors hooks for some nodes. IT reports 4-6 times speedup for grand aggregates and 3 times speedup for aggregation with group by.

Another project is IMCS: In-Memory Columnar Store. Here columnar store is implemented in memory and is accessed using special functions. So you can not use standard SQL query to work with this storage - you have to rewrite it using IMCS functions. IMCS provides vector operations (using tiles) and parallel execution.

Both CStore and IMCS are keeping data outside Postgres. But what if we want to use vector operations for data kept in standard Postgres tables? Definitely, the best approach is to impalement alternative heap format. Or even further: eliminate notion of heap at all - treat heap just as yet another access method, similar with other indexes.

But such radical changes requires deep redesign of all Postgres architecture. It will be better to estimate first possible advantages we can expect from usage of vector vector operations. Vector executor is widely discussed in Postgres forums, but efficient vector executor is not possible without underlying support at storage layer. Advantages of vector processing will be annihilated if vectors are formed from attributes of rows extracted from existed Postgres heap page.

The idea of VOPS extension is to implement vector operations for tiles represented as special Postgres types. Tiles should be used as table column types instead of scalar types. For example instead of "real" we should use "vops_float4" which is tile representing up to 64 values of the correspondent column. Why 64? There are several reasons for choosing this number:

  1. We provide efficient access to tiles, we need that size_of_tile*size_of_attribute*number_of_attributes is smaller than page size. Typical record contains about 10 attributes, default size of Postgres page is 8kb.
  2. 64 is number of bits in large word. We need to maintain bitmask to mark null values. Certainly it is possible to store bitmask in array with arbitrary size, but manipulation with single 64-bit integer is more efficient.
  3. Due to the arguments above, to efficiently utilize cache, size of tile should be in range 100..1000.

VOPS is implemented as Postgres extension. It doesn't change anything in Postgres executor and page format. It also doesn't setup any executors hooks or alter query execution plan. The main idea of this project was to measure speedup which can be reached by using vector operation with existed executor and heap manager. VOPS provides set of standard operators for tile types, allowing to write SQL queries in the way similar with normal SQL queries. Right now vector operators can be used inside predicates and aggregate expressions. Joins are not currently supported. Details of VOPS architecture are described below.

Types

VOPS supports all basic Postgres numeric types: 1,2,4,8 byte integers and 4,8 bytes floats. Also it supports date and timestamp types but them are using the same implementation as int4 and int8 correspondingly.

SQL typeC typeVOPS tile type
boolboolvops_bool
"char"charvops_char
int2int16vops_int2
int4int32vops_int4
int8int64vops_int8
float4float4vops_float4
float8float8vops_float8
dateDateADTvops_date
timestampTimestampvops_timestamp
intervalIntervalvops_interval
char(n)textvops_text(n)
varchar(n)textvops_text(n)

Vector operators

VOPS provides implementation of all built-in SQL arithmetic operations for numeric types: + - / * Certainly it also implements all comparison operators: = <> > >= < <=. Operands of such operators can be either tiles, either scalar constants: x=y or x=1.

Boolean operators and, or, not can not be overloaded. This is why VOPS provides instead of them operators & | !. Please notice that precedence of this operators is different from and, or, not operators. So you can not write predicate as x=1 | x=2 - it will cause syntax error. To solve this problem please use parenthesis: (x=1) | (x=2).

Also VOPS provides analog of between operator. In SQL expression (x BETWEEN a AND b) is equivalent to (x >= a AND x <= b). But as far as AND operator can not be overloaded, such substitution will not work for VOPS tiles. This is why VOPS provides special function for range check. Unfortunately BETWEEN is reserved keyword, so no function with such name can be defined. This is why synonym BETWIXT is used.

.

Postgres requires predicate expression to have boolean type. But result of vector boolean operators is vops_bool, not bool. This is why compiler doesn't allow to use it in predicate. The problem can be solved by introducing special filter function. This function is given arbitrary vector boolean expression and returns normal boolean which ... is always true. So from Postgres executor point of view predicate value is always true. But filter function sets filter_mask which is actually used in subsequent operators to determine selected records. So query in VOPS looks something like this:

  select sum(price) from trades where filter(day >= '2017-01-01'::date);

Please notice one more difference from normal sequence: we have to use explicit cast of string constant to appreciate data type (date type in this example). For betwixt function it is not needed:

  select sum(price) from trades where filter(betwixt(day, '2017-01-01', '2017-02-01'));

For char, int2 and int4 types VOPS provides concatenation operator || which produces doubled integer type: (char || char) -> int2, (int2 || int2) -> int4, (int4 || int4) -> int8. Them can be used for grouping by several columns (see below).

OperatorDescription
+Addition
-Binary subtraction or unary negation
*Multiplication
/Division
=Equals
<>Not equals
<Less than
<=Less than or Equals
>Greater than
>=Greater than or equals
&Boolean AND
|Boolean OR
!Boolean NOT
bitwixt(x,low,high)Analog of BETWEEN
is_null(x)Analog of IS NULL
is_not_null(x)Analog of IS NOT NULL
ifnull(x,subst)Analog of COALESCE

Vector aggregates

OLAP queries usually perform some kind of aggregation of large volumes of data. These includes grand aggregates which are calculated for the whole table or aggregates with group by which are calculated for each group. VOPS implements all standard SQL aggregates: count, min, max, sum, avg, var_pop, var_sampl, variance, stddev_pop, stddev_samp, stddev. Also it provides approaxdc for approximation of distinct count. Them can be used exactly in the same way as in normal SQL queries:

select sum(l_extendedprice*l_discount) as revenue
from vops_lineitem
where filter(betwixt(l_shipdate, '1996-01-01', '1997-01-01')
        & betwixt(l_discount, 0.08, 0.1)
        & (l_quantity < 24));

Also VOPS provides weighted average aggregate VWAP which can be used to calculate volume-weighted average price:

select wavg(l_extendedprice,l_quantity) from vops_lineitem;

It is possible to get first/last value of each group using first/last aggregates. This aggregates require two vector arguments: first one is value itself and second one - is timestamp by which sorting in each group is done:

select first(bid_price,ts),last(ask_price,ts) from vquote group by symbol;

Using aggregation with group by is more complex. VOPS provides two functions for it: map and reduce. The work is actually done by map(group_by_expression, aggregate_list, expr {, expr }) VOPS implements aggregation using hash table, which entries collect aggregate states for all groups. And set returning function reduce just iterates through the hash table consrtucted by map. reduce function is needed because result of aggregate in Postgres can not be a set. So aggregate query with group by looks something like this:

select reduce(map(l_returnflag||l_linestatus, 'sum,sum,sum,sum,avg,avg,avg',
    l_quantity,
    l_extendedprice,
    l_extendedprice*(1-l_discount),
    l_extendedprice*(1-l_discount)*(1+l_tax),
    l_quantity,
    l_extendedprice,
    l_discount)) from vops_lineitem where filter(l_shipdate <= '1998-12-01'::date);

Here we use concatenation operator to perform grouping by two columns. Right now VOPS supports grouping only by integer type. Another serious restriction is that all aggregated expressions should have the same type, for example vops_float4. It is not possible to calculate aggregates for vops_float4 and vopd_int8 columns in one call of map function, because it accepts aggregation arguments as variadic array, so all elements of this array should have the same type.

Aggregate string in map function should contain list of requested aggregate functions, separated by colon. Standard lowercase names should be used: count, sum, agg, min, max. Count is executed for the particular column: count(x). There is no need to explicitly specify count(*) because number of records in each group is returned by reduce function in any case.

reduce function returns set of vops_aggregate type. It contains three components: value of group by expression, number of records in the group and array of floats with aggregate values. Please notice that values of all aggregates, including count and min/max, are returned as floats.

create type vops_aggregates as(group_by int8, count int8, aggs float8[]);
create function reduce(bigint) returns setof vops_aggregates;

But there is much simple and straightforward way of performing group aggregates using VOPS. We need to partition table by group by fields. In this case grouping keys will be stored in normal way and other fields - inside tiles. Now Postgres executor will execute VOPS aggregates for each group:

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
    sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    countall(*) as count_order
from
    vops_lineitem_projection
where
    filter(l_shipdate <= '1998-12-01'::date)
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

In this example l_returnflag and l_linestatus fields of table vops_lineitem_projection have "char" type while all other used fields - tile types (l_shipdate has type vops_date and other fields - vops_float4). The query above is executed even faster than query with reduce(map(...)). The main problem with this approach is that you have to create projection for each combination of group by keys you want to use in queries.

Vector window functions

VOPS provides limited support of Postgres window functions. It implements count, sum, min, max, avg and lag functions. But unfortunately Postgres requires aggregates to have to similar final type for moving (window) and plain implementations. This is why VOPS has to choose define this aggregate under different names: mcount, msum, mmin, mmax, mavg.

There are also two important restrictions:

  1. Filtering, grouping and sorting can be done only by scalar (non-tile) attributes
  2. Only rows between unbounded preceding and current row frame is supported (but there is special version of msum which accepts extra window size parameter)

Example of using window functions with VOPS:

select unnest(t.*) from (select mcount(*) over w,mcount(x) over w,msum(x) over w,mavg(x) over w,mmin(x) over w,mmax(x) over w,x - lag(x) over w 
from v window w as (rows between unbounded preceding and current row)) t;

Using indexes

Analytic queries are usually performed on the data for which no indexes are defined. And columnar store vector operations are most efficient in this case. But it is still possible to use indexes with VOPS.

As far as each VOPS tile represents multiple values, index can be used only for some preliminary, non-precise filtering of data. It is something similar with BRIN indexes. VOPS provides four functions: first, last, high, low which can be used to obtain high/low boundary of values stored in the tile. First two functions first and last should be used for sorted data set. In this case first value is the smallest value in the tile and last value is the largest value in the tile. If data is not sorted, then lowhigh functions should be used, which are more expensive, because them need to inspect all tile values. Using this four function it is possible to construct functional indexes for VOPS table. BRIN index seems to be the best choice for VOPS table:

create index low_boundary on trades using brin(first(day)); -- trades table is ordered by day
create index high_boundary on trades using brin(last(day)); -- trades table is ordered by day

Now it is possible to use this indexes in query. Please notice that we have to recheck precise condition because index gives only approximate result:

select sum(price) from trades where first(day) >= '2015-01-01' and last(day) <= '2016-01-01'
                                               and filter(betwixt(day, '2015-01-01', '2016-01-01'));

Preparing data for VOPS

Now the most interesting question (from which may be we should start) - how we managed to prepare data for VOPS queries? Who and how will combine attribute values of several rows inside one VOPS tile? It is done by populate functions, provided by VOPS extension.

First of all you need to create table with columns having VOPS tile types. It can map all columns of the original table or just some most frequently used subset of them. This table can be treated as projection of original table (this concept of projections is taken from Vertica). Projection should include columns which are most frequently used together in queries.

Original table from TPC-H benchmark:

create table lineitem(
   l_orderkey integer,
   l_partkey integer,
   l_suppkey integer,
   l_linenumber integer,
   l_quantity real,
   l_extendedprice real,
   l_discount real,
   l_tax real,
   l_returnflag "char",
   l_linestatus "char",
   l_shipdate date,
   l_commitdate date,
   l_receiptdate date,
   l_shipinstruct char(25),
   l_shipmode char(10),
   l_comment char(44));

VOPS projection of this table:

create table vops_lineitem(
   l_shipdate vops_date not null,
   l_quantity vops_float4 not null,
   l_extendedprice vops_float4 not null,
   l_discount vops_float4 not null,
   l_tax vops_float4 not null,
   l_returnflag vops_char not null,
   l_linestatus vops_char not null
);

Original table can be treated as write optimized storage (WOS). If it has not indexes, then Postgres is able to provide very fast insertion speed, comparable with raw disk write speed. Projection in VOPS format can be treated as read-optimized storage (ROS), most efficient for execution of OLAP queries.

Data can be transferred from original to projected table using VOPS populate function:

create function populate(destination regclass, 
                        source regclass, 
                        predicate cstring default null, 
                        sort cstring default null) returns bigint;

Two first mandatory arguments of this function specify target and source tables. Optional predicate and sort clauses allow to restrict amount of imported data and enforce requested order. By specifying predicate it is possible to update VOPS table using only most recently received records. This functions returns number of loaded records. Example of populate function invocation:

select populate(destination := 'vops_lineitem'::regclass, source := 'lineitem'::regclass);

You can use populated table in queries performing sequential scan. VOPS operators can speed up filtering of records and calculation of aggregates. Aggregation with group by requires use of reduce + map functions. But as it was mentioned above in the section describing aggregates, it is possible to populate table in such way, that standard Postgres grouping algorithm will be used.

We need to choose partitioning keys and sort original table by this keys. Combination of partitioning keys expected to be NOT unique - otherwise tiles can only increase used space and lead to performance degradation. But if there are a lot of duplicates, then "collapsing" them and storing other fields in tiles will help to reduce space and speed up queries. Let's create the following projection of lineitems table:

create table vops_lineitem_projection(                                                                                    
   l_shipdate vops_date not null,
   l_quantity vops_float4 not null,
   l_extendedprice vops_float4 not null,
   l_discount vops_float4 not null,
   l_tax vops_float4 not null,
   l_returnflag "char" not null,
   l_linestatus "char" not null
);

As you can see, in this table l_returnflag and l_linestatus fields are scalars, and other fields - tiles. This projection can be populated using the following command:

select populate(destination := 'vops_lineitem_projection'::regclass, source := 'lineitem_projection'::regclass, sort := 'l_returnflag,l_linestatus');

Now we can create normal index on partitioning keys, define standard predicates for them and use them in group by and order by clauses.

Sometimes it is not possible or not desirable to store two copies of the same dataset. VOPS allows to load data directly from CSV file into VOPS table with tiles, bypassing creation of normal (plain) table. It can be done using import function:

select import(destination := 'vops_lineitem'::regclass, csv_path := '/mnt/data/lineitem.csv', separator := '|');

import function is defined in this way:

create function import(destination regclass, 
                       csv_path cstring, 
                       separator cstring default ',', 
                       skip integer default 0) returns bigint;

It accepts name of target VOPS table, path to CSV file, optional separator (default is ',') and number of lines in CSV header (no header by default). The function returns number of imported rows.

Back to normal tuples

A query from VOPS projection returns set of tiles. Output function of tile type is able to print content of the tile. But in some cases it is preferable to transfer result to normal (horizontal) format where each tuple represents one record. It can be done using unnest function:

postgres=# select unnest(l.*) from vops_lineitem l where filter(l_shipdate <= '1998-12-01'::date) limit 3;
                unnest                 
---------------------------------------
 (1996-03-13,17,33078.9,0.04,0.02,N,O)
 (1996-04-12,36,38306.2,0.09,0.06,N,O)
 (1996-01-29,8,15479.7,0.1,0.02,N,O)
(3 rows)

Back to normal tables

As it was mentioned in previous section, unnest function can scatter records with VOPS types into normal records with scalar types. So it is possible to use this records in arbitrary SQL queries. But there are two problems with unnest function:

  1. It is not convenient to use. This function has no static knowledge about the format of output record and this is why programmer has to specify it manually, if here wants to decompose this record.
  2. PostgreSQL optimizer has completely no knowledge on result of transformation performed by unnest() function. This is why it is not able to choose optimal query execution plan for data retrieved from VOPS table.

Fortunately Postgres provides solution for both of this problem: foreign data wrappers (FDW). In our case data is not really "foreign": it is stored inside our own database. But in alternatives (VOPS) format. VOPS FDW allows to "hide" specific of VOPS format and run normal SQL queries on VOPS tables. FDW allows the following:

  1. Extract data from VOPS table in normal (horizontal) format so that it can be proceeded by upper nodes in query execution plan.
  2. Pushdown to VOPS operations that can be efficiently executed using vectorized operations on VOPS types: filtering and aggregation.
  3. Provide statistic for underlying table which can be used by query optimizer.

So, by placing VOPS projection under FDW, we can efficiently perform sequential scan and aggregation queries as if them will be explicitly written for VOPS table and at the same time be able to execute any other queries on this data, including joins, CTEs,... Query can be written in standard SQL without usage of any VOPS specific functions.

Below is an example of creating VOPS FDW and running some queries on it:

create foreign table lineitem_fdw  (
   l_suppkey int4 not null,
   l_orderkey int4 not null,
   l_partkey int4 not null,
   l_shipdate date not null,
   l_quantity float4 not null,
   l_extendedprice float4 not null,
   l_discount float4 not null,
   l_tax      float4 not null,
   l_returnflag "char" not null,
   l_linestatus "char" not null
) server vops_server options (table_name 'vops_lineitem');

explain select
   sum(l_extendedprice*l_discount) as revenue
from
   lineitem_fdw
where
   l_shipdate between '1996-01-01' and '1997-01-01'
   and l_discount between 0.08 and 0.1
   and l_quantity < 24;
                       QUERY PLAN                        
---------------------------------------------------------
 Foreign Scan  (cost=1903.26..1664020.23 rows=1 width=4)
(1 row)

-- Filter was pushed down to FDW

explain select
    n_name,
    count(*),
    sum(l_extendedprice * (1-l_discount)) as revenue
from
    customer_fdw join orders_fdw on c_custkey = o_custkey
    join lineitem_fdw on l_orderkey = o_orderkey
    join supplier_fdw on l_suppkey = s_suppkey
    join nation on c_nationkey = n_nationkey
    join region on n_regionkey = r_regionkey
where
    c_nationkey = s_nationkey
    and r_name = 'ASIA'
    and o_orderdate >= '1996-01-01'
    and o_orderdate < '1997-01-01'
group by
    n_name
order by
    revenue desc;
                                                              QUERY PLAN                                                              
--------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=2337312.28..2337312.78 rows=200 width=48)
   Sort Key: (sum((lineitem_fdw.l_extendedprice * ('1'::double precision - lineitem_fdw.l_discount)))) DESC
   ->  GroupAggregate  (cost=2336881.54..2337304.64 rows=200 width=48)
         Group Key: nation.n_name
         ->  Sort  (cost=2336881.54..2336951.73 rows=28073 width=40)
               Sort Key: nation.n_name
               ->  Hash Join  (cost=396050.65..2334807.39 rows=28073 width=40)
                     Hash Cond: ((orders_fdw.o_custkey = customer_fdw.c_custkey) AND (nation.n_nationkey = customer_fdw.c_nationkey))
                     ->  Hash Join  (cost=335084.53..2247223.46 rows=701672 width=52)
                           Hash Cond: (lineitem_fdw.l_orderkey = orders_fdw.o_orderkey)
                           ->  Hash Join  (cost=2887.07..1786058.18 rows=4607421 width=52)
                                 Hash Cond: (lineitem_fdw.l_suppkey = supplier_fdw.s_suppkey)
                                 ->  Foreign Scan on lineitem_fdw  (cost=0.00..1512151.52 rows=59986176 width=16)
                                 ->  Hash  (cost=2790.80..2790.80 rows=7702 width=44)
                                       ->  Hash Join  (cost=40.97..2790.80 rows=7702 width=44)
                                             Hash Cond: (supplier_fdw.s_nationkey = nation.n_nationkey)
                                             ->  Foreign Scan on supplier_fdw  (cost=0.00..2174.64 rows=100032 width=8)
                                             ->  Hash  (cost=40.79..40.79 rows=15 width=36)
                                                   ->  Hash Join  (cost=20.05..40.79 rows=15 width=36)
                                                         Hash Cond: (nation.n_regionkey = region.r_regionkey)
                                                         ->  Seq Scan on nation  (cost=0.00..17.70 rows=770 width=40)
                                                         ->  Hash  (cost=20.00..20.00 rows=4 width=4)
                                                               ->  Seq Scan on region  (cost=0.00..20.00 rows=4 width=4)
                                                                     Filter: ((r_name)::text = 'ASIA'::text)
                           ->  Hash  (cost=294718.76..294718.76 rows=2284376 width=8)
                                 ->  Foreign Scan on orders_fdw  (cost=0.00..294718.76 rows=2284376 width=8)
                     ->  Hash  (cost=32605.64..32605.64 rows=1500032 width=8)
                           ->  Foreign Scan on customer_fdw  (cost=0.00..32605.64 rows=1500032 width=8)

-- filter on orders range is pushed to FDW

Unfortunately Postgres FDW mechanism is not parallel safe: it is using cursors which right now doesn't support parallel execution. It means that queries on foreign tables can ont use parallel plans, so them are so several times (proportional to number of parallel workers) slower than queries on normal tables. This is why it is ont recommended to use VOPS in this way.

Standard SQL query transformation

Previous section describes VOPS specific types, operators, functions,... Good news! You do not need to learn them. You can use normal SQL. Well, it is still responsibility of programmer or database administrator to create proper projections of original table. This projections need to use tiles types for some attributes (vops_float4,...). Then you can query this table using standard SQL. And this query will be executed using vector operations!

How it works? There are absolutely no magic here. There are four main components of the puzzle:

  1. User defined types
  2. User defined operator
  3. User defined implicit type casts
  4. Post parse analyze hook which performs query transformation

So VOPS defines tile types and standard SQL operators for this types. Then it defines implicit type cast from vops_bool (result of boolean operation with tiles) to boolean type. Now programmer do not have to wrap vectorized boolean operations in filter() function call. And the final transformation is done by post parse analyze hook, defined by VOPS extension. It replaces scalar boolean operations with vector boolean operations:

Original expressionResult of transformation
NOT filter(o1)filter(vops_bool_not(o1))
filter(o1) AND filter(o2)filter(vops_bool_and(o1, o2))
filter(o1) OR filter(o2)filter(vops_bool_or(o1, o2))

Now there is no need to use VOPS specific BETIXT operator: standard SQL BETWEEN operator will work (but still using BETIXT is slightly more efficient, because it performs both comparions in one function). Also there are no problems with operators precedence and extra parenthesis are not needed. If query includes vectorized aggregates, then count(*) is transformed to countall(*).

There is only one difference left between standard SQL and its vectorized extension. You still have to perform explicit type cast in case of using string literal, for example l_shipdate <= '1998-12-01' will not work for l_shipdate column with tile type. Postgres have two overloaded versions of <= operator which can be applied here:

  1. vops_date <= vops_date
  2. vops_date <= date

And it decides that it is better to convert string to the tile type vops_date. In principle, it is possible to provide such conversion operator. But it is not good idea, because we have to generate dummy tile with all components equal to the specified constant and perform (vector OP vector) operation instead of more efficient (vector OP scalar).

There is one pitfall with post parse analyze hook: it is initialized in the extension _PG_init function. But if extension was not registered in shared_preload_libraries list, then it will be loaded on demand when any function of this extension is requested. Unfortunately it happens after parse analyze is done. So first time you execute VOPS query, it will not be transformed. You can get wrong result in this case. Either take it in account, either add vops to session_preload_libraries configuration string. VOPS extension provides special function vops_initialize() which can be invoked to force initialization of VOPS extension. After invocation of this function, extension will be loaded and all subsequent queries will be normally transformed and produce expected results. But more convenient way is to add 'vops' to session_preload_libraries parameter.

Table projections

In previous section we described how it is possible to write queries for VOPS tables using normal SQL. This VOPS tables can be treated as projections of the main tables.

VOPS provides some functions simplifying creation and usage of projections. In future it may be added to SQL grammar, so that it is possible to write "CREATE PROJECTION xxx OF TABLE yyy(column1, column2,...) GROUP BY (column1, column2, ...)". But right now it can be done using create_projection(projection_name text, source_table regclass, vector_columns text[], scalar_columns text[] default null, order_by text default null) function. First argument of this function specifies name of the projection, second refers to existed Postgres table, vector_columns is array of column names which should be stores as VOPS tiles, scalar_columns is array of grouping columns which type is preserved and optional order_by parameter specifies name of ordering attribute (explained below). The create_projection(PNAME,...) functions does the following:

  1. Creates projection table with specified name and attributes.
  2. Creates PNAME_refresh() functions which can be used to update projection.
  3. Creates functional BRIN indexes for first() and last() functions of ordering attribute (if any)
  4. Creates BRIN index on grouping attributes (if any)
  5. Insert information about created projection in vops_projections table. This table is used by optimizer to automatically substitute table with partition.

The order_by attribute is on of the VOPS projection vector columns by which data is sorted. Usually it is some kind of timestamp used in time series (for example trade date). Presence of such column in projection allows to incrementally update projection. Generated PNAME_refresh() method calls populate method with correspondent values of predicate and sort parameters, selecting from original table only rows with order_by column value greater than maximal value of this column in projection. It assumes that order_by is unique or at least refresh is done at the moment when there is some gap in collected events. In addition to order_by, sort list for populate includes all scalar (grouping) columns. It allows to efficiently group imported data by scalar columns and fill VOPS tiles (vector columns) with data.

When order_by attribute is specified, VOPS creates two functional BRIN indexes on first() and last() functions of this attribute. Presence of such indexes allows to efficiently select time slices. If original query contains predicates like (trade_date between '01-01-2017' and '01-01-2018') then VOPS projection substitution mechanism adds (first(trade_date) >= '01-01-2017' and last(trade_date) >= '01-01-2018') conjuncts which allows Postgres optimizer to use BRIN indexes to locate affected pages.

In in addition to BRIN indexes for order_by attribute, VOPS also creates BRIN index for grouping (scalar) columns. Such index allows to efficiently select groups and perform index join.

Like materialized views, VOPS projections are not updated automatically. It is responsibility of programmer to periodically refresh them. Certainly it is possible to define trigger or rule which will automatically insert data in projection table when original table is updated. But such approach will be extremely inefficient and slow. To take advantage of vector processing, VOPS has to group data in tiles. It can be done only if there is some batch of data which can be grouped by scalar attributes. If you insert records in projection table on-by-one, then most of VOPS tiles will contain just one element. The most convenient way is to use generated PNAME_refresh() function. If order_by attribute is specified, this function imports from original table only the new data (not present in projection).

The main advantage of VOPS projection mechanism is that it allows to automatically substitute queries on original tables with projections. There is vops.auto_substitute_projections configuration parameter which allows to switch on such substitution. By default it is switched off, because VOPS projects may be not synchronized with original table and query on projection may return different result. Right now projections can be automatically substituted only if:

Projection can be removed using drop_projection(projection_name text) function. It not only drops the correspondent table, but also removes information about it from vops_partitions table and drops generated refresh function.

Example

The most popular benchmark for OLAP is TPC-H. It contains 21 different queries. We adopted for VOPS only two of them: Q1 and Q6 which are not using joins. Most of fragments of this code are already mentioned above, but here we collect it together:

-- Standard way of creating extension
create extension vops; 

-- Original TPC-H table
create table lineitem(
   l_orderkey integer,
   l_partkey integer,
   l_suppkey integer,
   l_linenumber integer,
   l_quantity real,
   l_extendedprice real,
   l_discount real,
   l_tax real,
   l_returnflag "char",
   l_linestatus "char",
   l_shipdate date,
   l_commitdate date,
   l_receiptdate date,
   l_shipinstruct char(25),
   l_shipmode char(10),
   l_comment char(44),
   l_dummy char(1)); -- this table is needed because of terminator after last column in generated data 

-- Import data to it
copy lineitem from '/mnt/data/lineitem.tbl' delimiter '|' csv;

-- Create VOPS projection
create table vops_lineitem(
   l_shipdate vops_date not null,
   l_quantity vops_float4 not null,
   l_extendedprice vops_float4 not null,
   l_discount vops_float4 not null,
   l_tax vops_float4 not null,
   l_returnflag vops_char not null,
   l_linestatus vops_char not null
);

-- Copy data to the projection table
select populate(destination := 'vops_lineitem'::regclass, source := 'lineitem'::regclass);

-- For honest comparison creates the same projection without VOPS types
create table lineitem_projection as (select l_shipdate,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag::"char",l_linestatus::"char" from lineitem);

-- Now create mixed projection with partitioning keys:
create table vops_lineitem_projection(                                                                                    
   l_shipdate vops_date not null,
   l_quantity vops_float4 not null,
   l_extendedprice vops_float4 not null,
   l_discount vops_float4 not null,
   l_tax vops_float4 not null,
   l_returnflag "char" not null,
   l_linestatus "char" not null
);

-- And populate it with data sorted by partitioning key:
select populate(destination := 'vops_lineitem_projection'::regclass, source := 'lineitem_projection'::regclass, sort := 'l_returnflag,l_linestatus');


-- Let's measure time
\timing

-- Original Q6 query performing filtering with calculation of grand aggregate
select
    sum(l_extendedprice*l_discount) as revenue
from
    lineitem
where
    l_shipdate between '1996-01-01' and '1997-01-01'
    and l_discount between 0.08 and 0.1
    and l_quantity < 24;

-- VOPS version of Q6 using VOPS specific operators
select sum(l_extendedprice*l_discount) as revenue
from vops_lineitem
where filter(betwixt(l_shipdate, '1996-01-01', '1997-01-01')
		& betwixt(l_discount, 0.08, 0.1)
		& (l_quantity < 24));

-- Yet another vectorized version of Q6, but now in stadnard SQL:
select sum(l_extendedprice*l_discount) as revenue
from vops_lineitem
where l_shipdate between '1996-01-01'::date AND '1997-01-01'::date
   and l_discount between 0.08 and 0.1
   and l_quantity < 24;



-- Original version of Q1: filter + group by + aggregation
select 
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
    sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

-- VOPS version of Q1, sorry - no final sorting
select reduce(map(l_returnflag||l_linestatus, 'sum,sum,sum,sum,avg,avg,avg',
    l_quantity,
    l_extendedprice,
    l_extendedprice*(1-l_discount),
    l_extendedprice*(1-l_discount)*(1+l_tax),
    l_quantity,
    l_extendedprice,
    l_discount)) from vops_lineitem where filter(l_shipdate <= '1998-12-01'::date);

-- Mixed mode: let's Postgres does group by and calculates VOPS aggregates for each group
select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
    sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    vops_lineitem_projection
where
    l_shipdate <= '1998-12-01'::date
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;



Performance evaluation

Now most interesting thing: compare performance results on original table and using vector operations on VOPS projection. All measurements were performed at desktop with 16Gb of RAM and quad-core i7-4770 CPU @ 3.40GHz processor with enabled hyper-threading. Data set for benchmark was generated by dbgen utility included in TPC-H benchmark. Scale factor is 10 which corresponds to about 8Gb database. It can completely fit in memory, so we are measuring best query execution time for warm data. Postgres was configured with shared buffer size equal to 8Gb. For each query we measured time of sequential and parallel execution with 8 parallel workers.

QuerySequential execution (msec)Parallel execution (msec)
Original Q1 for lineitem3802810997
Original Q1 for lineitem_projection338729656
Vectorized Q1 for vops_lineitem3372951
Mixed Q1 for vops_lineitem_projection1490396
Original Q6 for lineitem167964110
Original Q6 for lineitem_projection42791171
Vectorized Q6 for vops_lineitem875284

Conclusion

As you can see in performance results, VOPS can provide more than 10 times improvement of query speed. And this result is achieved without changing something in query planner and executor. It is better than any of existed attempt to speed up execution of OLAP queries using JIT (4 times for Q1, 3 times for Q6): Speeding up query execution in PostgreSQL using LLVM JIT compiler.

Definitely VOPS extension is just a prototype which main role is to demonstrate potential of vectorized executor. But I hope that it also can be useful in practice to speedup execution of OLAP aggregation queries for existed databases. And in future we should think about the best approach of integrating vectorized executor in Postgres core.

ALL sources of VOPS project can be obtained from this GIT repository. Chinese version of documentation is available here. Please send any feedbacks, complaints, bug reports, change requests to Konstantin Knizhnik.