On Columnar Formats and Encoding

columnar
vertica
redshift
etl

(Scott Hoover) #1

Introduction

Encoding is an important concept in columnar databases, like Redshift and Vertica, as well as database technologies that can ingest columnar file formats like Parquet or ORC. Particularly for the case of Redshift and Vertica—both of which allow one to declare explicit column encoding during table creation—this is a key concept to grasp. In this article, we will cover (i) how columnar data differs from traditional, row-based RDBMS storage; (ii) how column encoding works, generally; then we’ll move on to discuss (iii) the different encoding algorithms and (iv) when to use them.

Row Storage

Most databases store data on disk in sequential blocks. When data are queried, the disk is scanned and the data is retrieved. With traditional RDMBS (e.g., MySQL or PostgreSQL), data are stored in rows—multiple rows for each block. Consider the table events:

id user_id event_type ip created_at uri country_code
1 1 search 122.303.444 equation /search/products/filters=tshirts US
2 1 view 122.303.444 equation /products/id=11212 US
3 1 add_to_cart 122.303.444 equation /add_to_cart/product_id=11212 US
4 3 search 121.321.484 equation /search/products/filters=pants FR
5 3 search 121.321.484 equation /search/products/filters=“hammer pants” FR
6 3 search 121.321.484 equation /search/products/filters=“parachute pants” FR
7 9 checkout 121.322.432 equation /checkout/cart=478/status=confirmed UK
With traditional RDBMS, these rows will be stored as tuples, with each new row delimited in some fashion (in this case, I'm using semicolon and a line break for illustrative purposes): ```sql 1,1,search,122.303.444,t_{0},/products/search/filters=tshirts,US; --row 1 2,1,view,122.303.444,t_{1},/products/id=11212,US; --row 2 ...; 7,9,checkout,121.322.432,t_{2},/checkout/cart=478/status=confirmed,UK; --row 7 ``` Indexing speeds up various operations because the database can skip over entire blocks very quickly, minimizing seek. Let's assume there was an index on `user_id`. Let's also assume that the 3 events for user 1 were stored in block a, the 3 events for user 3 were stored in block b, and the 1 event for user 9 was stored in block c. Finally, let's suppose we issue the following query (Query 1) to the database:
select * 
from events 
where user_id = 3

In this case, our index would allow the database to skip the first block, fetch from the second block, and stop there in order to get the data it needs for this query.

Now, suppose we issue a more analytical query (Query 2) to this database:

select 
    created_at::date as created_date
    , count(1) as search_events 
from events 
where event_type = 'search' 
group by 1 
order by 1

In this case, we don’t really benefit from our index on user_id, and we don’t have an index declared on event_type, so our query will scan all of the rows across all of the blocks for the information it needs, identifying predicate matches on rows 1, 4, 5, and 6, then performing the aggregation and group by. This would be a relatively expensive operation on a large data set.

Columnar Storage

In columnar databases and in columnar file formats, data are transformed such that values of a particular column are stored adjacently on disk. Revisiting how our data are stored, this time in columnar format, we’d expect it to look like this:

1,2,3,4,5,6,7; --id
1,1,1,3,3,3,9; --user_id
search,view,add_to_cart,search,search,search,checkout; --event_type
...; --misc columns
US,US,US,FR,FR,FR,UK; --country_code

Again, the contiguous values in a column can be co-located in the same blocks. Let’s assume that id, user_id, and event_type are located in block a, ip and created_at are located in block b, and uri and country are located in block c.

Revisiting Query 2 above, we can see that the database would only need to access the data in blocks a and b in order to execute our query (and in both cases, we’d only need to scan a subset of the data in each block). Conversely, Query 1 would require that the database scan all blocks on disk to marshall the values of every column associated with the rows that match our filter predicate.

We can see, when comparing these two types of data stores, that the clear benefit with row-base storage comes from operations that query, update, insert, or delete many columns for specific rows or ranges of rows, provided indexes are in place. This is why these database technologies remain so prominent for OLTP workloads. Alternatively, columnar storage favors queries that operate on a subset of columns, typically accompanied by aggregations and filters. The benefits of columnar databases are improved by introducing the notion of compression or encoding.

Column Encoding

Because row-based storage may often store heterogeneous data adjacently, compression options are limited. In our example, in a given row, the user_id, an integer, is stored directly next to event_type, a character, meaning a compression algorithm that’s particularly well suited for integers cannot be used on the entire row, and the same holds for a compression algorithm that’s particularly well suited for strings. Column stores, however, are bound to store relatively homogeneous data adjacently—at the very least, the explicit type of data is the same. This fact allows us to apply type-specific encoding, reducing the data’s impact on space and improving retrieval speeds when querying the data. Revisiting our columnar data stored on disk, let’s assume that we apply type-specific encoding and ignore the specifics of implementation for the time being.

Conceptually, our data would start like this:

1,2,3,4,5,6,7;
1,1,1,3,3,3,9;
search,view,add_to_cart,search,search,search,checkout;
...;
US,US,US,FR,FR,FR,UK;

and, once compressed, reduce to this:

1,2,3,4,5,6,7;
1,3,9;
search,view,add_to_cart,checkout;
...;
US,FR,UK;

Obviously, this is not the full story; but this simplistic example should convey how columnar storage coupled with some form of encoding can reduce the amount of data stored on disk, sometimes dramatically.

Encoding Types

Let equation denote the cardinality of a column, where cardinality is defined as the number of elements in a set. For example, the set equation, derived from the tuple, equation, has cardinality equation. Each of the encoding types listed below represents a class of encoding. Which is to say, there may be implementations, or codecs, for each class that can be applied to a range of data types. (In the following examples, the visualizations are meant to convey the general idea of the encoding type and may not accurately represent the specifics of how the data are stored on disk.)

Runlength

Runlength encoding reformats the data to contain the value itself accompanied by the number of times that value is repeated on subsequent rows (i.e., the length of its run) until interrupted by a new value. While we might expect there to be ranges in the data where we have alternating values, row over row, for random or deterministic but ordered processes, we’d expect to see runs of repeated values.

equation

Dictionary

With dictionary encoding, the general idea is that a hash-map or dictionary is created, where the set of original values is mapped to an integer. Then all of the original data are replaced with the corresponding integer. Particularly in the case of strings, the integer replacements compress to relatively few bytes.

equation

Delta

Delta encoding starts off by copying the first entry in the column. Subsequent values, however, are populated by taking a difference between the current value and the preceding value. We can see that this has the potential to take potentially large values (in terms of bytes) and reduce their size dramatically. In our simple example, to get the value associated with any point in the column, we simply sum all preceding values.

equation

Sliding-Window Codecs (LZ77, LZSS, LZO, DEFLATE, etc.)

The basic idea with this class of algorithm is that runs of characters previously seen are replaced with a reference to said previously seen run. The reference itself is some sort of tuple containing the offset (i.e., starting position), the run length, and (optionally) the deviation. Consider the following stream of text:

foo bar
bar baz
foobarbaz

The encoded version might look something like this:

foo bar
(4,3)baz
(0,3)(4,3)(8,3)

While this example might not look like much space is conserved, the savings can be considerable on larger streams of data.

Encoding Strategies

Runlength

For data whose equation we might reasonably expect to see many runs, making this a good choice of encoding. Typically, this might be a boolean, binary categorical (e.g., male, female), or low-cardinality categorical (e.g., graduate degree, bachelors, some college, high school) column.

Dictionary

Dictionary encoding works well when the cardinality is a bit larger than what’s appropriate for runlength encoding, and not too large so as to make a massive dictionary yielding slow lookups. Also, the values in the dictionary, in most implementations, have a fairly limited length, so larger varchars tend to be poor choices here. As a loose rule, I recommend a cardinality between 10 and 10,000. Dictionary encoding is particularly well suited for states, countries, job titles, etc.

Delta

Delta encoding can be applied to a variety of data types. In our simplistic example, we can see that monotonically increasing or decreasing values would likely compress down dramatically. There are delta codecs that are meant to handle potentially large row-over-row differences—e.g., delta32k in Redshift…

Sliding Window

This class of algorithm is very well suited for longer streams of text. For example, raw text from documents or support chats, browser agents, etc.

Further Considerations

Tuple Reconstruction

Wherever possible, the database is going to perform operations on a column-by-column basis rather than on tuples. This allows for tight, CPU-friendly for loops, improved parallelization, and vectorization. On the other hand, when a client renders a result set or when a result set is returned via JDBC or ODBC, the individual columns need to be reconstructed into tuples in order to display the results in a tabular format. To achieve this, modern columnar databases are likely to employ some form of late materialization or tuple reconstruction.

The general idea is this: most execution engines will perform filters on the columns in the where clause, one by one, noting the row positions of the filter-predicate matches in each step, passing the positions along to the next filter operation. When it comes time to join—and assuming some filtering in the query is actual present—only a subset of rows from each table is used in the join. The join itself only requires the column(s) explicitly included in the join predicate. When filtering and joining is done, the execution engine is left with a number of arrays, one for each table in the join, together creating the positional references for the ultimate result set. At this point, the relevant attribute values from the columns across the tables are stitched together to create the tuples that make up the result set.

To illustrate, suppose that we continue with our example events table and also add a user look-up table, users, so that our schema now looks like this:

[events]

id user_id event_type ip created_at uri country_code
1 1 search 122.303.444 equation /search/products/filters=tshirts US
2 1 view 122.303.444 equation /products/id=11212 US
3 1 add_to_cart 122.303.444 equation /add_to_cart/product_id=11212 US
4 3 search 121.321.484 equation /search/products/filters=pants FR
5 3 search 121.321.484 equation /search/products/filters=“hammer pants” FR
6 3 search 121.321.484 equation /search/products/filters=“parachute pants” FR
7 9 checkout 121.322.432 equation /checkout/cart=478/status=confirmed UK

[users]

id created_at age
1 equation 32
2 equation 29
3 equation 21
4 equation 38
5 equation 33
6 equation 32
7 equation 27
8 equation 26
9 equation 30

Now, let’s suppose that we issue the following query.

select count(1) as search_events
from   events
join   users
on     events.user_id = users.id
where  events.event_type = 'search'
and    events.created_at < t_{3}
and    users.age < 30

Here’s how our database might process this query, step by step:

Step 1—Filter. Create a bit string or array that references the rows in events.event_type that equal 'search'. We will refer to this as our “position list.”

equation

Step 2—Reconstruct. We have to filter on our second condition against the events table, but we also know, from our previous step, that we only have to check our second condition on a subset of rows. In other words, before we filter on the entire events.created_at column, we can and should reduce our search to a subset of rows found in our position list.

equation

Step 3—Filter. Now we can evaluate the second filter, events.created_at < t_{3}, again returning a position list considering both of the preceding filter predicates.

equation

Step 4—Filter. Our final step before joining is obtaining a position list for the users table based on the filter users.age < 30.

equation

Step 5—Join. We know that our join involves two columns: events.user_id and users.id. We also have two position lists from our filters that restrict the possible rows in play and, therefore, the values we have to match in the join. (In the join illustration, we have both the position index and the value itself, indicated by i and v like so: equation)

equation

At this point, we have the row positions that we’ll need from both tables in order to project any column from either table. Our original example select count(1) from ... would yield 2. We could easily turn this into something like select users.age, events.country_code from ... and have our row positions projected onto these two columns in order to extract the information we need, yielding:

age country_code
21 FR
21 FR

This is the basic idea of late materialization in tuple reconstruction. There are early materialization approaches where the tuples are stitched together much earlier in the process, such that later operations are applied to the entire tuple. In many situations, late materialization improves performance. We can see, however, that it is associated with many join-like operations between the position lists and the values in the columns themselves. Good cost-based optimizers may weigh both late and early materialization plans to see which has a lower overall cost. In the case that large tables get filtered down to reasonably small result sets, late materialization may prove to be the faster option.

Operations on Compressed Data

Column stores may have the ability to perform certain operations on compressed data, yielding even better performance in certain circumstances. In other cases, operations must first decompress the data before it can be used. One must be mindful of the columns they choose to compress, with particular consideration for how the data will likely be used.

For columns that only ever appear in the projection or restriction, and which are not used for joins, the execution engine may be able to work on the compressed data. For example, if we use runlength encoding on a column, the average could be computed using strictly the compressed data with the following formula:

equation

where equation captures the value, v, of a given row and the number of occurrences, o, of that value on following rows.

More concretely, if we had a column of ages, our average could be computed like so:

equation

If a column is used in a join, however, the execution engine will almost certainly have to decompress it before it can be used. This is expensive and should be avoided if possible. For Redshift, similar complications make compressing sort and distribution keys a risky proposition.

Column Metadata

Modern columnar databases, as well as columnar formats, will store column meta data in the file(s) along with the column data itself. This allows certain operations to avoid operating on the underlying data altogether, potentially improving performance.

Let’s suppose that each block on disk maps to one file and contains some number of columns. Furthermore, let’s assume that each column is accompanied by the following summary statistics: min, max, and count. (Obviously, some stats only make sense for certain data types, but we’ll ignore that detail for the time being.) If I were to issue the following query

select max(some_column)
from my_table

conceivably, we could get this figure by taking a max of the “max” summary statistic from this column. Similarly, if we issued

select count(1)
from my_table
where some_column > x

we could quickly discard blocks of data whose “max” summary statistic did not exceed x.

Note: For this reason, it’s important to run analyze on tables regularly within Redshift, as it updates column meta data.


Database Overview Series II: Managed MPP Column-Stores
(Brayden Cleary) #2

Thanks for the post, Scott. Am about to apply encoding to a handful of Redshift tables and your post was a useful primer in the subject.

Re “Tuple Reconstruction”, any chance you can run through how early materialization would work given the same query example…I’m assuming early materialization would recreate all rows that match the join condition events.user_id = users.id and then filter based on the where clauses?


(Scott Hoover) #3

@BraydenC I’m glad you found the post useful. I’d gladly step through a potential early materialization strategy. I’ll update the post within a few days. However, in summary, we might expect to see tuples being constructed at each step, in a sort of cumulative fashion, so as to avoid re-accessing columns.

For example, for a given query, we may filter on a column that’s also included in the join. In the case of late materialization, our plan may first do the restriction, move on to intermediate steps, and then re-access the column for the purpose of a join. With this approach, there is potentially an added expense from the re-accessing of the column. Early materialization would filter on the column in question, then send along both the column data as well as the position list to the next steps, building up the tuple along the way.

Here’s the tradeoff: if intermediate result sets are quite large, they’ll likely spill over to disk. This necessarily slows query times, but is unavoidable in certain cases. With that said, early materialization is more likely to spill over to disk than late materialization. On the other hand, complex queries with many joins may result in lots of steps. With the join-like operations associated with late materialization, this may slow query times. Thankfully, Redshift’s execution engine is going to handle this tradeoff for you (at least, as best as it can).

If you have flexibility in how your schema is designed, you can look to the system tables to see which queries are spilling over and how that impacts query times. In certain cases, you might be able to flatten the schema during ETL to eliminate joins, or you can suggest different filtering strategies for your users, etc.

/*
number of disk-based steps in a given query
*/
select count(1) as number_of_disk_based_steps
from  stl_save
where query = YOUR_QUERY_ID
and   is_diskbased = 't';
/*
query-planner alerts and solutions for a given query
*/
select distinct event
, solution
from stl_alert_event_log 
where query = YOUR_QUERY_ID;