Speeding up a sklearn model pipeline to serve single predictions with very low latency

· 2291 words · 11 minute read

a.k.a. witing your own sklearn functions, part 3. If you have worked with sklearn before you certainly came across the struggles between using dataframes or arrays as inputs to your transformers and estimators. Both bring their advantages and disadvantages. But once you deploy your model, for example as a service, in many cases it will serve single predictions. Max Halford has shown some great examples on how to improve various sklearn transformers and estimators to serve single predictions with an extra performance boost and potential responses in low millisecond range! In this short post we will advance these tricks and develop a full pipeline.

A few months ago Max Halford wrote an awesome blogpost where he describes how we can modify sklearn transformers and estimators to handle single data points at a higher speed, essentially using one-dimensional arrays. When you build sklearn model pipelines they usually work with numpy arrays and pandas dataframes at the same time. Arrays often provide better performance, because the numpy implementations for many computations are high performant and often vectorized. But it also gets trickier to control your transformations using column names, which the arrays do not have. If you use pandas dataframes you might get worse performance, but your code might get more readable and column names (i.e. feature names) stick with the data for most transformers. During data exploration and model training you are mostly interested in batch transformations and predictions, but once you deploy your trained model pipeline as a service, you might also be interested in single predictions. In both cases service users will send a payload like below.

Imagine a service, where we estimate weight of fish depending on some size measures (in reference to the later introduced fish market dataset), then a request might look as follows:

{
    "species": "Bream",
    "length": 24.5,
    "heigth": 12.3,
    "width": 4.12,
}

or alternatively ["Bream", 24.5, 12.3, 4.12], and the model may return a weight estimation as follows:

{
    "weight": 242
}

In his blogpost Max Halford showed how you can add transform_single and predict_single methods to transformers and estimators to perform single datapoint processing with higher performance. Depending on the complexity of the pipeline, the absolute amount of saved time might not be huge. But higher response times will add up to the overall latency of your service infrastructure, and short timings will take pressure from the app, especially if it is within the critical path. We will also end up being able to save on infrastructure cost as we can run your service on smaller hardware, i.e. smaller and less pods. Moreover avoiding dataframe coercion will free up memory on the serving instances. Last but not least we save time that we can essentially spend on more sophisticated transformations and models - something that makes every data scientist happy!

But what is the price for cutting the response time? We can explore this by looking at an example, without advertising class inheritance here, but rather as a sketch of how this could work:

## Built with python 3.8.10 with following packages:
## scikit-learn==0.24.0
## pandas==0.25.3
import numpy as np
from typing import Union, List


class BarebonesTransformer(Transformer):
    def transformer_single(self, x: Union[np.array, List], **kwargs):
        # a fast transformation on single datapoints equivalent to self.transform
        return some_fast_transformation(x, **kwargs)
        
barebones_transformer = BarebonesTransformer()

barebones_transformer.fit(data)
barebones_transformer.transform_single([1.0, 2.5])

On the one hand we risk to lose training & inference/prediction parity. What does this mean? As we can see above we essentially have two different code paths our data can take during the transformation: the code path for single predictions will only be used for inference of single data points, but not during training, where we normally transform in batches, i.e. dataframes or arrays. Therefore we need to put extra effort on making sure that both transformation paths lead to the same transformation, and therefore to the same results. This can for example be achieved by adding some extra unit tests.

On the other hand we might lose some of the validation sklearn is doing internally, i.e. when the parental transform method is called. Therefore we need to make sure to properly validate our payloads before they are passed to the model to keep the model from crashing unexpectedly.

The same ideas also apply to the estimators and the predict method. In the end it is like sending a letter via truck (plus insurance), it works, it is a safe option, but it might be excessive and a mail carrier on a bike might be more appropriate and faster.

If we are happy with both drawbacks we can save some time per request if we spent some time on manipulating existing transformers for single datapoints.

Now since we have seen how we could get this working, let us evaluate the performance of the pandas and numpy-based approaches using some toy day and sklearn’s SimpleImputer, a transformer that imputes missing data, for example using the mean. We will use the quite robust pd.isna to check for missing values in our 1d array:

import pandas as pd
import numpy as np

np.random.seed(47723)

# truncate decimals for better printing
np.set_printoptions(precision=3, suppress=True)
pd.set_option('precision', 3)

n = 1000
data = pd.DataFrame({
    'num1': np.random.normal(0, 1, n),
    'num2': np.random.normal(0, 1, n)
})

# remove 10% of the data
data[np.random.rand(*data.shape) > 0.9] = np.nan

data.head()
##     num1   num2
## 0  0.897 -1.626
## 1  1.370  0.279
## 2    NaN -0.652
## 3  1.379 -0.164
## 4  0.450    NaN

The SimpleImputer stores the fitted imputation values in self.statistics_ (by convention fitted values always end with an underscore):

from sklearn.impute import SimpleImputer

simple_imputer = SimpleImputer(strategy='mean')
simple_imputer.fit_transform(data)
## array([[ 0.897, -1.626],
##        [ 1.37 ,  0.279],
##        [ 0.071, -0.652],
##        ...,
##        [-0.233,  0.741],
##        [ 0.071, -0.627],
##        [-1.056, -0.622]])
simple_imputer.statistics_
## array([0.071, 0.016])

We can use these values in our transform_single fill missing values:

from sklearn.impute import SimpleImputer

class BarebonesSimpleImputer(SimpleImputer):
    def transform_single(self, x):
        return np.where(pd.isnull(x), self.statistics_, x)

barebones_simple_imputer = BarebonesSimpleImputer(strategy='mean')
barebones_simple_imputer.fit_transform(data)
## array([[ 0.897, -1.626],
##        [ 1.37 ,  0.279],
##        [ 0.071, -0.652],
##        ...,
##        [-0.233,  0.741],
##        [ 0.071, -0.627],
##        [-1.056, -0.622]])
barebones_simple_imputer.transform_single(np.array([np.nan, 3]))
## array([0.071, 3.   ])

Let us now evaluate the performance improvement. We will make use of timeit and some simple helper functions to measure the timings in milliseconds:

from timeit import timeit


def time_func_call(call: str, n: int = 1000):
  t = timeit(call, globals = globals(), number=n) / n
  t_ms = np.round(t * 1000, 4)
  return t_ms

time_func_call('barebones_simple_imputer.transform(data)')
## 2.0122

We will define another helper function which compares and pretty-prints multiple function call timings:

from typing import List


def time_func_calls(calls: List[str]):
    max_width = np.max([len(call) for call in calls])
    for call in calls:
        t_ms = time_func_call(call)
        print(f'{call:{max_width}}: {t_ms:.4f}ms')
    return

We can apply this now to multiple and single datapoints in form of dataframes and numpy arrays:

data_single_row = data.iloc[[0],:]
data_array = data.to_numpy()

data_single_1d_array = data_array[0]
data_single_1d_array
## array([ 0.897, -1.626])
data_single_2d_array = data_array[[0],:]
data_single_2d_array
## array([[ 0.897, -1.626]])
time_func_calls([
    'barebones_simple_imputer.transform(data)',
    'barebones_simple_imputer.transform(data_array)',
    'barebones_simple_imputer.transform(data_single_row)',
    'barebones_simple_imputer.transform(data_single_2d_array)',
    'barebones_simple_imputer.transform_single(data_single_1d_array)',
])
## barebones_simple_imputer.transform(data)                       : 2.0333ms
## barebones_simple_imputer.transform(data_array)                 : 0.3363ms
## barebones_simple_imputer.transform(data_single_row)            : 1.9852ms
## barebones_simple_imputer.transform(data_single_2d_array)       : 0.3181ms
## barebones_simple_imputer.transform_single(data_single_1d_array): 0.0172ms

So the single-datapoint-transformation outperforms the other implementations. Let us quickly check out the OneHotEncoder, another very helpful transformer which encodes categorical variables with dummy variables. We will also define some toy data again:

n = 3000

data = pd.DataFrame({
    'cat1': np.random.choice(['a', 'b', 'c'], n),
    'cat2': np.random.choice(['x', 'y'], n)
})

data.head()
##   cat1 cat2
## 0    a    x
## 1    b    x
## 2    b    y
## 3    a    x
## 4    b    y

The OneHotEncoder stores the learned categories in a list in self.categories_, from where we can pick it up and use it to encode the categorical variables:

from sklearn.preprocessing import OneHotEncoder

class BarebonesOneHotEncoder(OneHotEncoder):
    def transform_single(self, x):
        x_enc = [np.where(cats==x[col], 1, 0) for col, cats in enumerate(self.categories_)]
        return np.concatenate(x_enc, axis=0)

barebones_one_hot_encoder = BarebonesOneHotEncoder(sparse=False, handle_unknown='ignore')
barebones_one_hot_encoder.fit_transform(data)
## array([[1., 0., 0., 1., 0.],
##        [0., 1., 0., 1., 0.],
##        [0., 1., 0., 0., 1.],
##        ...,
##        [0., 0., 1., 0., 1.],
##        [1., 0., 0., 1., 0.],
##        [0., 1., 0., 1., 0.]])
barebones_one_hot_encoder.categories_
## [array(['a', 'b', 'c'], dtype=object), array(['x', 'y'], dtype=object)]
barebones_one_hot_encoder.transform_single(['b', 'x'])
## array([0, 1, 0, 1, 0])

Let us evaluate benchmark the different cases again:

data_single_row = data.iloc[[0],:]
data_array = data.to_numpy()

data_single_1d_array = data_array[0]
data_single_1d_array
## array(['a', 'x'], dtype=object)
data_single_2d_array = data_array[[0],:]
data_single_2d_array
## array([['a', 'x']], dtype=object)
time_func_calls([
    'barebones_one_hot_encoder.transform(data)',
    'barebones_one_hot_encoder.transform(data_array)',
    'barebones_one_hot_encoder.transform(data_single_row)',
    'barebones_one_hot_encoder.transform(data_single_2d_array)',
    'barebones_one_hot_encoder.transform_single(data_single_1d_array)',
])
## barebones_one_hot_encoder.transform(data)                       : 2.2561ms
## barebones_one_hot_encoder.transform(data_array)                 : 1.8474ms
## barebones_one_hot_encoder.transform(data_single_row)            : 1.0046ms
## barebones_one_hot_encoder.transform(data_single_2d_array)       : 0.4966ms
## barebones_one_hot_encoder.transform_single(data_single_1d_array): 0.0181ms

Now let us plug this together and measure overall performance improvement of a common pipeline. We will fetch some dataset called fish market dataset, which contains size measurements and categorization of fishes, where we want to predict their weight.

def get_data(na_rate=0):
    data = pd.read_csv('https://gist.githubusercontent.com/stelsemeyer/32cd8a83338c11f354c4cd46167744d7/raw/' + 
                       'e73b350d88f1ff2d6d3ca0f6246a3f4ef8c5ad8b/fish_market_data.csv')
    data.columns = [col.replace(' ', '_').lower() for col in data.columns]
    
    target_col = 'weight'
    x, y = data.drop([target_col], axis=1), data[target_col]
    
    if na_rate>0:
        np.random.seed(123)
        x[np.random.rand(*x.shape) > 1-na_rate] = np.nan
    
    return x, y

x, y = get_data(na_rate=.05)

The data looks as follows:

x.head()
##   species  length1  length2  length3  height  width
## 0   Bream     23.2     25.4     30.0  11.520  4.020
## 1     NaN     24.0     26.3     31.2  12.480  4.306
## 2   Bream     23.9     26.5     31.1  12.378  4.696
## 3   Bream     26.3     29.0     33.5  12.730  4.455
## 4   Bream     26.5     29.0     34.0  12.444  5.134
y.head()
## 0    242.0
## 1    290.0
## 2    340.0
## 3    363.0
## 4    430.0
## Name: weight, dtype: float64

If we want to apply imputation and one-hot-encoding to our data we need to use ColumnTransformers to dispatch the transformations to the correct columns. Thus we need to do some minor modifications to it to be able to use the transform_single method:

  • implement transform_single similar to transform, for ex. using the self._iter
  • implement an identity transformer with transform_single which can be passed to handle the remainder, i.e. the remaining columns
from sklearn.compose import ColumnTransformer
from sklearn.base import BaseEstimator, TransformerMixin


class BarebonesColumnTransformer(ColumnTransformer):
    def transform_single(self, x, **kwargs):
        Xs = list()
        # iterate over transformers and concatenate transformed windows of data
        for idx, (name, trans, column, _) in enumerate(self._iter(fitted=True, replace_strings=True), 1):
            Xs += [trans.transform_single(x[column])]
        return np.hstack(Xs)


class IdentityTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        return X
    
    def transform_single(self, x, y=None):
        return x

If we want to use the barebone transformers and estimators in a pipeline, we have to modify the pipeline itself as well, by adding a predict_single similar to the predict which uses the transform_single methods of the transformers and calls predict_single of the model, as Max also describes in his post.

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression

class BarebonesPipeline(Pipeline):
    def predict_single(self, x):
        for _, transformer in self.steps[:-1]:
            x = transformer.transform_single(x)
        return self.steps[-1][1].predict_single(x)
    
    # helpful if you want to run the transformations without the model
    def transform_single(self, x):
        for _, transformer in self.steps[:-1]:
            x = transformer.transform_single(x)
        return x
        

class BarebonesLinearRegression(LinearRegression):
    def predict_single(self, x):
        return  self.intercept_ + np.dot(self.coef_, x)

We can now construct our pipeline. We will impute the categorical variable with the most frequent value and the numeric values with the mean (not the most clever imputation methods here, since a strong relation between both exists and a conditional mean or nearest neighbour approach would be better). We will then one-hot-encode the categorical variable and train a linear model on the data.

imputer = BarebonesColumnTransformer(
    transformers=[
        ('category_imputer', BarebonesSimpleImputer(strategy='most_frequent'), [0]),
        ('numeric_imputer', BarebonesSimpleImputer(), [1, 2, 3, 4, 5]),
    ],
    remainder='drop'
)

one_hot_encoder = BarebonesColumnTransformer(
    transformers=[
        ('one_hot_encoder', BarebonesOneHotEncoder(handle_unknown='ignore', sparse=False), [0]),
    ],
    remainder=IdentityTransformer()
)

barebones_pipeline = BarebonesPipeline(
    steps=[
        ('imputer', imputer),
        ('one_hot_encoder', one_hot_encoder),
        ('model', BarebonesLinearRegression())
    ]
)

barebones_pipeline.fit(x, y);

Now let us apply the pipeline to our data to benchmark the performance on single predictions:

x_single_row = x.iloc[[0],:]

x_array = x.to_numpy()

x_single_1d_array = x_array[0]
x_single_1d_array
## array(['Bream', 23.2, 25.4, 30.0, 11.52, 4.02], dtype=object)
x_single_2d_array = x_array[[0],:]
x_single_2d_array
## array([['Bream', 23.2, 25.4, 30.0, 11.52, 4.02]], dtype=object)
time_func_calls([
    'barebones_pipeline.predict(x)',
    'barebones_pipeline.predict(x_array)',
    'barebones_pipeline.predict(x_single_row)',
    'barebones_pipeline.predict(x_single_2d_array)',
    'barebones_pipeline.predict_single(x_single_1d_array)',
])
## barebones_pipeline.predict(x)                       : 7.5262ms
## barebones_pipeline.predict(x_array)                 : 3.0187ms
## barebones_pipeline.predict(x_single_row)            : 7.2470ms
## barebones_pipeline.predict(x_single_2d_array)       : 2.6604ms
## barebones_pipeline.predict_single(x_single_1d_array): 0.1141ms

Let us finally evaluate that both predictions are identical. Running the predict still uses the full-fledged sklearn code path, opposed to our lightweight {transform,predict}_single method:

batch_predictions = barebones_pipeline.predict(x)
batch_predictions[0:5]
## array([285.874, 418.604, 363.433, 417.214, 459.909])
single_predictions = [barebones_pipeline.predict_single(x_i) for x_i in x.to_numpy()]
single_predictions[0:5]
## [285.87391827684905, 418.6038670618609, 363.433474702691, 417.2142372313433, 459.90900599287806]
np.all(np.isclose(batch_predictions, single_predictions, atol = 0.0001))
## True

We can speed up our pipeline by a factor of 10 for single predictions. But the more transformations we add the more valueable the speedup will be, and the clearer the tradeoff might become. We have seen how we can use custom transformers or adjust existing ones to speed up single data point transformations and predictions, at the price of extra time spent on engineering (especially if the transformation is more complex), and extra care spent on training-inference parity, unit tests and data validation.

If you are trying to find bottlenecks of your transformers I recommend using line_profiler and memory_profiler. They might not be overseeable on the whole pipeline (you have to pass all individual functions to it), but on the individual transformers. You can use the profiler in the following fashion, with magic:

x = np.array(['a', 'x'])

%load_ext line_profiler
# barebones_simple_imputer was trained above
%lprun -f barebones_simple_imputer.transform barebones_simple_imputer.transform(x)

or without magic:

from line_profiler import LineProfiler


x = np.array(['a', 'x'])

profiler = LineProfiler()
profiler.add_function(barebones_one_hot_encoder.transform_single)
# you can pass more functions to get a detailed output as follows:
# profiler.add_function(barebones_simple_imputer.some_attr.some_func)
# profiler.add_function(pd.isna)

profiler.runctx('barebones_one_hot_encoder.transform_single(x)', globals(), locals())
profiler.print_stats() 
## Timer unit: 1e-06 s
## 
## Total time: 0.000101 s
## File: <ipython-input-94-83230faba6d8>
## Function: transform_single at line 4
## 
## Line #      Hits         Time  Per Hit   % Time  Line Contents
## ==============================================================
##      4                                               def transform_single(self, x):
##      5         1         73.0     73.0     72.3          x_enc = [np.where(cats==x[col], 1, 0) for col, cats in enumerate(self.categories_)]
##      6         1         28.0     28.0     27.7          return np.concatenate(x_enc, axis=0)