Timeseries APIs on a dime with Node, Tastypie and MySQL
ime series data is quickly becoming all the rage in big data circles. The primary use case for large amounts of time series data tends to be visualization of collected metrics. This could be the temperature of our house, CPU usage of a remote server, the oil levels of your car, etc. In a nut shell, time series data is:
- Data over a continuous time interval
- Data contains successive measurements across that interval
- Data uses equal spacing between every two consecutive measurements
- Each time unit within the interval has at most one data point
It might look something like this
[
{
time: '2016-01-01 00:00:00', // minute 0
value: 1
},{
time: '2016-01-01 00:01:00', // minute 1
value: 2
}
]
The data is sequential, continuous and contains a single data point and a time at which the data point was recorded. Just by looking at the data you can see just how well it maps to graphing - Time usually the x axis
and the value is the y axis
. Lets say for our example, we are running a small e-commerce store and we have been collecting sales data for the past few years and we would like to visualize profits against certain metrics.
For example, do we make more money selling hanes or fruit of the loom. Do we make more money from things that are white or black, and so on. Now of course we didn't plan for this and we have all of this data normalized across a bunch of tables in our database. We have over 10 million records of data, with a good number of foreign keys, and we need to make it performant. We could of course set up a non-relational time series data base like influxdb or openTSDB, figure out an ETL layer using some or all of hadoop projects, and slurp data into our database. But that is a lot of work that I just don't want to do nor do I have time for!
Current Table Setup
- user - The user selling something
- order - The incoming order for a user
- order items - The individual items on an order
- variant - a product variation (
product
+size
+color
) - products - A Single sellable item
- colors - color name, and hex codes
- sizes - sizes and extra costs associated to a size (
L
,XL
, etc )
ETL [ee 'tee ehl] -n, --noun.
Extract Transform Load
a process in database usage
- Extracts data from homogeneous or heterogeneous data sources.
- Transforms the data for storing it in the proper format
- Loads the transformed data into the final storage destination.
The primary killer of performance in our situation is all of the joins we have her perform on a table with 10 million records. What we really want to do is just make a new table to hold a de-normalized subset of that data and query that. Recently I was turned onto a shiny gem in MYsql callce scheduled events. Which is basically a cron job at the database level that runs a query. That is exactly what I want. We can write a simple event to do the heavy query every hour. We are going to need a couple of tables to pull this off.
Database
- Table to keep track of the last time each job was executed
- Table to store the results of each job we create
Our job tacking table is simple - id
, job name
and last run time
We are using this to keep track of when our job run so we only sample data that hasn't yet been sampled.
Table Layout
-- create ETL job table
CREATE TABLE `timeseries_etl_job` (
`timeseries_etl_job_id` int(11) NOT NULL AUTO_INCREMENT,
`job_name` varchar(100) NOT NULL,
`job_last_run` datetime NOT NULL DEFAULT '1970-01-01 00:00:00',
PRIMARY KEY (`timeseries_etl_job_id`),
UNIQUE KEY `job_name_UNIQUE` (`job_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- table for product specific data
CREATE TABLE IF NOT EXISTS `timeseries_trend_product` (
`timeseries_trend_product_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`time` date NOT NULL,
`value` int(11) NOT NULL DEFAULT '0',
`size` varchar(7) NOT NULL,
`color` varchar(45) NOT NULL,
`color_hex` varchar(7) NOT NULL,
`tax` int(11) NOT NULL DEFAULT '0',
`product` varchar(255) NOT NULL,
`product_id` int(11) NOT NULL,
`user_id` int(11) NOT NULL,
`quantity_sold` int(11) NOT NULL DEFAULT '0',
`total_price` int(11) NOT NULL DEFAULT '0',
PRIMARY KEY (`analytics_trend_product_id`),
KEY `idx_user_time` (`user_id`,`time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
As you have probably noticed, we have a lot more data in our table than just time
and value
. We are going to use each data field as a separate metric that can be aggregated and graphed. It will make more sense in a bit, I promise.
ETL EVENT
Now for the the tricky bit. First you need to enable the event scheduler in mysql. You should set event_scheduler=on
under the [mysqld]
section in the configuration. You can also enable it for an open session with a single query.
SET GLOBAL event_scheduler = ON;
After that is done, the event schedule itself is very similar to writing a transaction with one or more queries, but with an event schedule header. Our Schedule needs to do 3 things:
- Get the last run time of the same job
- Set the last run time of the same job to
NOW()
- Execute a single
INSERT INTO
statement with all of the data rolled up as we need it starting from the last time the job was run previously
DELIMITER $$
CREATE EVENT IF NOT EXISTS ts_aggregate_trend_product
ON SCHEDULE EVERY 1 HOUR
STARTS NOW()
ON COMPLETION PRESERVE
DO BEGIN
SELECT GET_LOCK('ts_etl_product', 0) INTO @got_lock;
IF @got_lock = 1 THEN
START TRANSACTION;
-- Get the last un time
SET @last_run := ( SELECT job_last_run from timeseries_etl_job where job_name = 'timeseries_trend_product');
-- set the last run time to now
UPDATE timeseries_etl_job SET job_last_run = NOW() WHERE job_name = 'timeseries_trend_product';
-- Rollup the data and insert into our time series table
INSERT INTO timeseries_trend_product(
`time`
, `value`
, order_item_id
, size
, color
, color_hex
, product
, product_id
, user_id
, quantity_sold
, total_price
) SELECT
DATE(order_item.created_at) as `time`
, ( ( order_item.price_items / order_item.quantity ) - order_item.discount_amount ) as value
, order_item.id
, size.name
, color.name
, color.hex_prim
, product.name
, product.id as product_id
, product.user_id
, order_item.quantity
, order_item.price
FROM order_item
INNER JOIN order ON order.id = order_item.order_id
INNER JOIN variant ON variant.variant_id = order_item.variant_id
INNER JOIN color ON color.color_id = variants.color_id
INNER JOIN size ON size.size_id = variants.size_id
INNER JOIN product ON product.product_id = variants.product_id
INNER JOIN user ON user.user_id = product.user_id
where order.status = 'paid'
AND order.paid_at > @last_run
ORDER BY order_item.created_at ASC;
COMMIT;
SELECT RELEASE_LOCK('ts_etl_product') INTO @discard;
END IF;
END $$
DELIMITER ;
We are only doing a partial down sample here. We are not calculating the total profit of all of the items up until now, but the profit on each item for a given order and storing a little metadata about the item. We'll do the math at query time. Relational databases are still really good at adding! This single operation takes our multi-million row query that included several tables down to one table with a few hundred thousand records.
Relational databases are still really good at adding!
There are a couple of important things about this query to note
Change The Default Delimiter
We need to change the delimiter of the event to something other than a semicolon so we can use them in our actual query. Just remember to change it back at the end!
DELIMITER $$
Preserve Events
The default behavior for events is to be dropped / disabled after they run. Yes, even if the schedule suggests that it will run more than once, as we have done here with EVERY 1 HOUR
Set ON COMPLETION
to PRESERVE and everything is fine.
ON COMPLETION PRESERVE
Last Execution Time
We need the fetch the last execution time of the same job, and store in a variable so we can constrain our query to only the records we haven't yet processed
SET @last_run := ( SELECT job_last_run from timeseries_etl_job where job_name = 'timeseries_trend_product');
Aquire A Lock
This is how you obtain an mutex lock in mysql. This isn't a table or row lock, it is just a named mutex. We use this here so events don't start stacking up. Or if we have a bad query that doens't close a transaction or something like that, we only do the bad thing once. Optional, but recommended.
SELECT GET_LOCK('ts_etl_product', 0) INTO @got_lock;
Cast Timestamps
The time interval is only granular to a day. Mainly because it makes grouping and aggregation easier, and faster. You could certainly store these as date time for higher precision, but performance would suffer a bit. There are certainly pretty simple work arounds, but I'm going with a DATE
DATE(order_item.created_at ) as `time`
Time Series Tastypie Resource
This is actually rather simple to set up with Tastypie. Using the default resource class, you can simply override the get_objects
method, which is used, as the name would imply, to return an array of data anyway you can get it from the database.
var Resource = require('tastypie').Resource;
module.exports = Resource.extend({
options:{
fields:{
time : { type:'date' }
, value : { type:'int' }
, metric : { type:'field' }
}
}
, constructor: function( options ){
this.parent('constructor', options );
}
, get_objects: function( bundle, callback ){
var array_of_data = get_some_data();
callback(null, array_of_data);
}
});
Bookshelf & Knex
I personally have had to implement this using the popular bookshelf, and knex libraries. Knex more so than bookshelf, and it is just as easy to work with dynamic data metrics
var tastpie = require('tastypie')
, Resource = tastypie.Resource;
module.exports = Resource.extend({
options:{
pk: 'timeseries_trend_product'
, filtering:{
time:[ 'range', 'period' ]
,user:[ 'exact' ]
,metric:[ 'exact' ]
}
,fields:{
time : { type:'date', help:'time of event' }
, value : { type:'int', help:'the sum the metric at given time' }
, metric : { type:'field', help:'the metric we are measuring' }
// user field here exclusively for filtering
, user : { type:'int', attribute:'user_id', exclude: true }
}
,constructor: function( options ){
this.parent( 'constructor', options );
}
,get_objects: function( bundle, callback ){
let queryset = BookshelfModel.collection()
, that = this
;
queryset.query( this.aggregate.bind(this, bundle) )
.fetch()
.then(function( objects ){
// tell bookshelf to not be slow
callback(null, objects.toJSON({shallow:true}) );
})
.catch( console.error )
}
, aggregate: function( bundle, qb ){
let metric = bundle.req.query.metric
// validate the metric...
qb.select(
'time'
, `${metric} as metric`
)
.sum( 'value as value' )
.groupBy( 'time' )
.groupBy( 'metric' );
delete bundle.req.query.metric;
// apply other filtering logic
// apply result limits
// apply sorting
return qb;
}
})
This resource allows the user to define the series data with a custom metric
query parameter. Notice that we apply a group by on both time and our customer metric. We do that so data will come back with each metric in series by date and sum up our profit data by that metric, on a specific day.
Time Windows
So by default, these queries will return all data since the beginning of time. Albeit, interesting, it will be very slow. We need an easy way to specify a limited window of time to view into; the last 6 months or year to date, etc.
We can use a simple filter to dynamically apply time period filter on our time field using Time Intervals
which most popular databases support. A filter function for tastypie to do that might look something like this
//filters/period.js
module.exports = function period(qb, field, term){
// Result will be an array of [NUMBER, TIME FRAME]
// [ 6, "MONTH" ]
let result = parse_term( term );
return qb.whereRaw(`${field} > CURDATE() - INTERVAL ${result[0]} ${result[1]}`);
};
This little function will apply a window of the current date less the passed in time interval, 1 Year
for examle. It generates SQL
like this:
SELECT * FROM TABLE where time > CURDATE() - INTERVAL 1 YEAR
And with that You can query a single endpoint for a specific metric
over a specific window of time.
// ?format=json&user=100&metric=color&time__period=6m
[
{
time: '2016-01-01', // Day 1
value: 1,
metric:'white'
},{
time: '2016-01-01',
value: 2,
metric:'black'
},{
time: '2016-01-02', // Day 2
value: 10,
metric:'white'
},{
time: '2016-01-02',
value: 30,
metric:'black'
}
// and so on
]
Change the metric and time filter we will get a different view of the data
// ?format=json&user=100&metric=size&time__period=1y
[
{
time: '2015-01-01', // Day 1
value: 10,
metric:'m'
},{
time: '2015-01-01',
value: 30,
metric:'xl'
},{
time: '2015-01-02', // Day 2
value: 5,
metric:'m'
},{
time: '2015-01-02',
value: 100,
metric:'xl'
}
// and so on
]
This keeps the data format consistent and our response footprint small for fast response times. I have been able to return over 8000
records in under 200ms
and less than 70kb
rather consistently.
Hook Into Hapi
All that is left to make it real, is plug the resources into a Hapi Server instance
var hapi = require('hapi')
, tastypie = require('tastypie')
, TimeseriesResource = require("./timeseries")
, v1
, server
server = new hapi.Server();
server.connection({port:3000});
v1 = new tastypie.Api('api/v1');
v1.use( new TimeseriesResource() );
server.register([v1], function(){
server.start( console.log );
});
Time series data on a budget without any over complications, using tools you are already familiar with. And, for most use cases, it is fast enough.
- 2 tables
- 1 Event Query
- 1 Resource
- Fast Enough
Ship it.
-- If you are interested in actual the filter functionality for tastypie & bookshelf, I have started a repo on github with some of the functionality here.