In Cassandra, rows are hash partitioned by default. If you want to data sorted by some attribute, column name sorting feature of Cassandra is usually exploited. If you look at the Cassandra slice range API, you will find that you can specify only the range start, range end and an upper limit on the number of columns fetched.
However in many applications the need is to paginate through the data i.e each call should fetch a predetermined number of items.
There is no easy way to map the desired number of items to be returned to the column name range, particularly when column names are not evenly distributed.
The only option is to select a range such that that the number of items expected to return is far greater than the max limit. In this post I will discuss an adaptive range reader that I have implemented recently in agiato hosted on github. It borrows ideas from feedback control system to adaptively change the column range so that a predetermined number of items are returned.
Note on Cassandra Read
Reads in Cassandra is more expensive than writes, which may sound very counter intuitive. Cassandra maintains multiple versions of a column in a persistent data structure called SSTable in the disk. There is also a corresponding in memory data structure called MemTable, which contains the latest writes.
During the read for a column, Cassandra has to reconcile the different versions across the MemTable and one or more SSTable to find the latest column value.
Digging into Slice Range
Let’s take an example of time series data and consider the model where the data is sharded by by hour. In this model, a row contains an hour’s worth of data. The column family has only simple columns.There are no super columns.
Each data item is in a column, where the column name is a long which is the number of mili sec since the last hour when the data item was generated. The column value is the actual data stored as a serialized byte stream. There could be up to 3.6 million columns in a row with a maximum data arrival rate of 1 per mili sec. The data model is as below. The row key is of the format yyyy-mm-dd-hh. Here is a detailed blog on how to model time series data in Cassandra in case you are interested.
2012-01-20-14 | …. | 73765 | 73769 | …. |
…. | xxxx | xxxx | …. | |
2012-01-20-15 | …. | 71875 | 71879 | …. |
…. | xxxx | xxxx | …. |
Here is how the slice range constructor is defined. It gets used in the Cassandra API get_slice(). As you can tell, there is no way to set the appropriate range that will ensure the return of expected number of items.
Here is how the slice range constructor is defined. It gets used in the Cassandra API get_slice(). As you can tell, there is no way to set the appropriate range that will ensure the return of expected number of items.
SliceRange(byte[] start, byte[] finish, boolean reversed, int count)
The only thing you can do is to set start low enough and finish high enough, and hope that the number of columns in that range is greater than count, resulting in count number of items returned from the query. But, there is no guarantee and nobody likes hope based logic.
With our example, with a fluctuating data arrival rate, it is difficult to select the appropriate slice range to get the same pre determined number of columns from each call.
Adaptive Range Selection
RangeReader is adaptive because it adapts itself to return a predictable number of items for each query execution. In the RangeReader class, some of the important constructor arguments related to range adjustment are as follows
batchSize | desired number of columns to be returned |
maxFetchSize | maximum number of columns to be returned |
batchSizeTolerance | determines if range needs to be adjusted based on how much returned column count deviates from batch size |
startCol | start column |
initialRangeSize | initial range |
RangeReader tires to return the number of columns as close as possible to batchSize. However the actual number of columns returned could be anywhere between 0 and maxFetchSize.
It works like a feedback loop. Before executing the query, it checks the result of the last query invocation. If the last query invocation returned less than batchSize number of columns the range is increased proportionally and vice versa.
If the last query execution returned no data, then the range is doubled. It’s increased non linearly, until the query returns some data.
Here is the main query method that the client calls repeatedly to navigate through columns in a row. The client application will call this method repeatedly to navigate through data.
[php]
public List getColumnValues() throws Exception {
if (!atRowEnd) {
//set column range
setEndCol();
colRange.clear();
colRange.add(startCol);
colRange.add(endCol);
//range query
colValues = dataAccess.retrieveSubColumns( rowKey, superCol,
colRange, true, maxFetchSize, consLevel);
atRowEnd = endCol == endMarker && colValues.size() = 0) {
if (lastFetchCount > batchSizeMax || lastFetchCount startColLong){
endColLong = startColLong + curRangeSize;
endCol = Util.getByteBufferFromLong(endColLong);
} else {
endCol =endMarker;
}
}
}[/php]
The full source code can be viewed here. It uses a simple greedy algorithm to adjust the range based on the local distribution of data. Currently it supports only LongType column. In future I will add support for UTF8Type.
Pagination
Although the RangeReader class makes best effort in returning batchSize number of records, it may not be good enough, if you want exactly batchSize number of columns to be returned.
One way to achieve pagination is to have a Paginator class which does some extra buffering on the result returned by RangeReader.
If the number of columns returned by RangeReader class is less than batchSize, it makes repeated calls to RangeReader, until batchSize number of columns have been fetched. On the other hand if the number of columns returned by RangeReader is more than batchSize, only batchSize number of columns is returned to the client and the rest is cached. I will be implementing the Paginator class in near future. So, stay tuned.
Originally posted here.