Speed up custom operations on large datasets with universal functions#

Vectorisation refers to the application of the same calculation to an entire array at once. This is key in improving the performance of array manipulation. So much so that between numpy, xarray, and scipy practially all conceivable operations have already been vectorised.

But as we learn from time to time, researchers always find new an exciting calculations to do, and out come the for loops. For loops are easy to understand and it’s not a bad idea to try out certain data interactions.

But python loops are terribly inefficient, as you’ll see shortly, and for the actual number crunching, you should use vectorisation.

Let’s first load the relevant modules, then look at a few examples.

import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client
import dask.array as da
import scipy.stats as stats

Generate some data for the testing#

The following function just creates some data array with random numbers in it. I will use this function several times over the course of this article to generate data arrays of various shapes and sizes, but the actualy contents of this function is not that relevant.

def create_dataarray(nlat, nlon, ntime=None, seed=1000):

    np.random.seed(seed)
    
    lat = np.linspace(-90, 90, nlat, endpoint=True)
    lat = xr.DataArray(lat, dims=('lat',), coords={'lat': lat}, attrs={'units': 'degree_north', 'name': 'Latitude'})
    
    lon = np.linspace(-180, 180, nlon+1, endpoint=True)[1:]
    lon = xr.DataArray(lon, dims=('lon',), coords={'lon': lon}, attrs={'units': 'degree_east', 'name': 'Longitude'})

    if ntime is not None:
        time = pd.date_range(start='2000-01-01', freq='D', periods=ntime)
        time = xr.DataArray(time, dims=('time',))
        return xr.DataArray(
            np.random.random([ntime, nlat, nlon]), 
            dims=('time', 'lat', 'lon'),
            coords={'time': time, 'lat': lat, 'lon': lon},
            attrs={'name': 'random'}
        )

    return xr.DataArray(
        np.random.random([nlat, nlon]), 
        dims=('lat', 'lon'),
        coords={'lat': lat, 'lon': lon},
        attrs={'name': 'random'}
    )

Scalar Functions#

First, let’s start at a function that takes a scalar value and returns a scalar value. We want to apply this function to every value of a data array.

The function in our example is not particularly complex, it limits the values at a given level, by default 0.5.

def limit(value, max_value=0.5):
    return max_value if max_value < value else value

print(f"{limit(0.2)=}")
print(f"{limit(1.2)=}")
print(f"{limit(0.2, max_value=0.1)=}")
limit(0.2)=0.2
limit(1.2)=0.5
limit(0.2, max_value=0.1)=0.1

In order to verify that our function does exactly what we want it to do, we start with a very small array where we can really check whether it has done the correct thing.

small_da = create_dataarray(1, 5)
small_da
<xarray.DataArray (lat: 1, lon: 5)>
array([[0.65358959, 0.11500694, 0.95028286, 0.4821914 , 0.87247454]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0
Attributes:
    name:     random

We want to apply the limit function to this data array. This is what the xarray.apply_ufunc method does. ufunc in this case means “Universal Function” and the apply_ufunc method is meant to be able to apply almost any function to an array.

The positional arguments of the apply_ufunc are thus:

  1. The function, in our case limit

  2. The arrays that contain the data to be passed to the function. In our case this is only one array: small_da

Our function limit expects scalar values, so we need to tell the call to apply_ufunc that it needs to vectorise the array. This is done by the vectorize=True parameter. (Note the American spelling.) It means that the method cuts the array into its elements, feeds each element individually into the function, and finally collects the returned values into a new array:

xr.apply_ufunc(limit, small_da, vectorize=True)
<xarray.DataArray (lat: 1, lon: 5)>
array([[0.5       , 0.11500694, 0.5       , 0.4821914 , 0.5       ]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0

This worked just as expected: All the values that used to be larger than 0.5 are now capped at that value.

We can supply additional parameters to the function by using the kwargs (keyword arguments) parameter. This parameter expects a dictionary with the name of the keyword as the key, and its value as the value to be passed. In this case we lower the limit to 0.3:

xr.apply_ufunc(limit, small_da, vectorize=True, kwargs={'max_value': 0.3})
<xarray.DataArray (lat: 1, lon: 5)>
array([[0.3       , 0.11500694, 0.3       , 0.3       , 0.3       ]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0

Now the cutoff has been lowered to 0.3, and more values have been capped.

With vectorize=True, the apply_ufunc method under the hood uses the numpy.vectorize method to vectorise the input. Doing this explicitly would look something like this:

xr.apply_ufunc(np.vectorize(limit), small_da)
<xarray.DataArray (lat: 1, lon: 5)>
array([[0.5       , 0.11500694, 0.5       , 0.4821914 , 0.5       ]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0

The apply_ufunc documentation mentiones that this might be suboptimal performance wise, and that pre-vectorised functions should be used.

A vectorised function already expects an array. In our case it would look something like this:

def limit_v(array, value=0.5):
    return np.where(array>value, value, array)

xr.apply_ufunc(limit_v, small_da)
<xarray.DataArray (lat: 1, lon: 5)>
array([[0.5       , 0.11500694, 0.5       , 0.4821914 , 0.5       ]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0

Of course in this section we were talking about scalar functions, so this is a bit of a spoiler for what’s coming up.

Performance Comparison of Vectorisation#

To compare the performance of the various ways to apply the limit function to an array, we create a larger array, and then use ipython’a %%time and %%timeit magics to measure the performance of each method.

What are these magics?

The %%time keyword (called ‘magic’ by ipython) measures the time it takes to execute a cell and prints it underneath. The %%timeit keyword runs each cell several times and averages the runs.

large_da = create_dataarray(180, 360)
%%time
limited = large_da.copy()
for x in range(len(large_da.lon)):
    for y in range(len(large_da.lat)):
        limited[y, x] = limit(large_da[y, x])
CPU times: user 5.84 s, sys: 1.99 ms, total: 5.85 s
Wall time: 5.85 s
%%timeit
limited = xr.apply_ufunc(limit, large_da, vectorize=True)
4.94 ms ± 15 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit
limited = xr.apply_ufunc(limit_v, large_da)
61.4 µs ± 66.4 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Conclusion for scalar functions#

It is pretty clear that we gain several orders of magnitude performance boost over the explicit loops even with the automatically vectorised call.

This just shows how bad a perfomance hit is when used with python loops.

The best is if we have a function that is already vectorised, so works on arrays, but this is not always feasible.

Reduction functions#

Next we want to see what we need to do when our function reduces an array dimension to a single value.

For example, this returns the index of the largest value along its axis:

def argmax(array):
    return array.argmax()

You might think that this is already vectorised because it expects an array, but it is not: The function expects a 1-d array, and we want to apply it to the time dimension of a 3-d array.

First, we create a small data array with a time dimension, and then we use the function above to get the largest value along the time dimension for each lat/lon gridpoint.

small_da = create_dataarray(1, 2, 5)
small_da
<xarray.DataArray (time: 5, lat: 1, lon: 2)>
array([[[0.65358959, 0.11500694]],

       [[0.95028286, 0.4821914 ]],

       [[0.87247454, 0.21233268]],

       [[0.04070962, 0.39719446]],

       [[0.2331322 , 0.84174072]]])
Coordinates:
  * time     (time) datetime64[ns] 2000-01-01 2000-01-02 ... 2000-01-05
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 0.0 180.0
Attributes:
    name:     random

Core dimensions#

The concept of core dimensions isn’t very well explained in the documentation.

Our function wants to look along the time dimension for the largest value. In the speak of xarray, this makes time the core dimension for the input of the function. This dimension is core to the objective of the function.

input_core_dims is a list of tuples of dimension names. Because the function has only a single input array, the outer list of the input_core_dims has only one element, the tuple ('time',). Because time is the only core dimension of the first array, the first (and only) tuple has only this name as its element.

Why is there a comma in “(‘time’,)”?

Simple parentheses around python expressions will be evaluated. So ('time') is the same as 'time', which is a string and iterable (!). Python would then think that the core dimensions would be t, i, m, and e. Adding the comma after 'time' tells python: No, this is a tuple with a single element, the string “time”. If the tuple has more than one element, the trailing comma is no longer necessary.

xr.apply_ufunc(argmax, small_da, input_core_dims=[('time',)], vectorize=True)
<xarray.DataArray (lat: 1, lon: 2)>
array([[1, 4]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 0.0 180.0

Vectorisation, again#

Again, we want to see whether we can manually vectorise a function.

With vectorize=True the data is spliced into 1-D arrays and individually passed to the function. This means that a call to argmax is done for each individual lat/lon grid point.

With vectorize=False (or no vectorize at all), the full array is passed to the function in one go, except that the core dimension is placed at the very end.

We can use this, by telling numpy’s argmax method axis=-1 it should only reduce the last dimension, regardless of how many dimensions the array contains.

def argmax_v(array, verbose=False):
    if verbose:
        print(f"{array.shape=}")
    return array.argmax(axis=-1)

I’ve added the verbose parameter to the vectorised function, so that I can show what happens when the apply_ufunc is called with and without vectorize=True:

r1 = xr.apply_ufunc(
    argmax_v, small_da, 
    input_core_dims=[('time',)], 
    vectorize=True, kwargs={'verbose': True},
)
array.shape=(5,)
array.shape=(5,)
r2 = xr.apply_ufunc(
    argmax_v, small_da, 
    input_core_dims=[('time',)], 
    vectorize=False, kwargs={'verbose': True},
)
array.shape=(1, 2, 5)

As you can see, with vectorize=True, the multidimensional array is sliced into subarrays arrays along the core dimension, in this case only the 1-d array along “time”.

Because we have 2 gridpoints, 2 along the longitude times 1 along the latitude, this means that the function is called 2 times.

On the other hand, with vectorize=False, the whole array is passed to the the function in one go, with the core dimension moved to the last index. (See the 5 at the end of the shape, when in the data array it’s the first value in the shape?)

But how do the two compare performance-wise? Again, we create a larger dataset for the comparisons.

large_da = create_dataarray(181, 360, 365)
%%timeit
am = xr.apply_ufunc(argmax, large_da, input_core_dims=[('time',)], vectorize=True)
87.4 ms ± 277 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
am_v = xr.apply_ufunc(argmax_v, large_da, input_core_dims=[('time',)])
38.1 ms ± 253 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

We can see that this time also there is a noticeable improvement of performance, the vectorised version ran twice as fast.

Functions that take and return an array#

Sometimes, the function not only takes an array as input, but also returns an array.

For this example, we want to invert the values along one dimension.

def invert(array):
    return array[ ..., -1::-1]

print(f"{invert(np.array([1, 2, 3, 4]))=}")
invert(np.array([1, 2, 3, 4]))=array([4, 3, 2, 1])

What is …

numpy has the notation ... to note that there may be more dimensions than we’re using. The syntax array[ ..., -1::-1 ] means, depending on the number of dimensions of array: array[-1::-1] for a 1-d array, array[:, -1::-1] for a 2-d array, array[:, :, -1::-1] for a 3-d array and so forth.

small_da = create_dataarray(2, 5)
small_da
<xarray.DataArray (lat: 2, lon: 5)>
array([[0.65358959, 0.11500694, 0.95028286, 0.4821914 , 0.87247454],
       [0.21233268, 0.04070962, 0.39719446, 0.2331322 , 0.84174072]])
Coordinates:
  * lat      (lat) float64 -90.0 90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0
Attributes:
    name:     random
xr.apply_ufunc(
    invert, small_da, input_core_dims=[('lon',)], 
    output_core_dims=[('lon',)]
)
<xarray.DataArray (lat: 2, lon: 5)>
array([[0.87247454, 0.4821914 , 0.95028286, 0.11500694, 0.65358959],
       [0.84174072, 0.2331322 , 0.39719446, 0.04070962, 0.21233268]])
Coordinates:
  * lat      (lat) float64 -90.0 90.0
  * lon      (lon) float64 -108.0 -36.0 36.0 108.0 180.0

As you can see, the values along the longitude array have indeed been mirrored around.

Because the input core dimension and the output core dimension have the same name, the apply_ufunc method assumes that it uses the same coordinates. Note that the longitude values are still in the original, ascending order.

Differing length between input and output core dimension#

We have just looked at a function that returns an array of the same shape as its input, but that might not necessarily be so.

The input and output of a function might be arrays of different length.

In this case we’re simply capping the time dimension to 3 values, forcing a change of the length of the time dimension:

def first_three(array):
    return array[ ..., :3]

If I try as above to give it the same name, I will get an error. The same name suggests that it should have the same coordinates, but the length of the dimension no longer matches.

There are two options to resolve this:

  1. Give the output core dimension a new name. This is the first example, we give it the name new_time. The result is that it does have a dimension with the new name, but no coordinates attached.

  2. Tell it that it should ignore the change. This is done by using the exclude_dim parameter, as seen in the second example. This parameter requires a set of dimension names (from the list of output_core_dimensions). Again, the coordinates for the time dimension are removed.

small_da = create_dataarray(1, 2, 5)
small_da
<xarray.DataArray (time: 5, lat: 1, lon: 2)>
array([[[0.65358959, 0.11500694]],

       [[0.95028286, 0.4821914 ]],

       [[0.87247454, 0.21233268]],

       [[0.04070962, 0.39719446]],

       [[0.2331322 , 0.84174072]]])
Coordinates:
  * time     (time) datetime64[ns] 2000-01-01 2000-01-02 ... 2000-01-05
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 0.0 180.0
Attributes:
    name:     random
xr.apply_ufunc(
    first_three, 
    small_da, 
    input_core_dims=[('time',)], 
    output_core_dims=[('new_time',)]
)
<xarray.DataArray (lat: 1, lon: 2, new_time: 3)>
array([[[0.65358959, 0.95028286, 0.87247454],
        [0.11500694, 0.4821914 , 0.21233268]]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 0.0 180.0
Dimensions without coordinates: new_time
xr.apply_ufunc(
    first_three, small_da, 
    input_core_dims=[('time',)], 
    output_core_dims=[('time',)],
    exclude_dims=set(['time'])
)
<xarray.DataArray (lat: 1, lon: 2, time: 3)>
array([[[0.65358959, 0.95028286, 0.87247454],
        [0.11500694, 0.4821914 , 0.21233268]]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 0.0 180.0
Dimensions without coordinates: time

Applying a ufunc over multiple arrays#

Now that we have a rough understaning of how the xarray.apply_ufunc works, let’s try something more complex.

We want to calculate the Pearson Correlation Coefficient between the input dataset and another array.

scipy.stats has a method called pearsonr that implements this calculation. It returns an object with two attributes: statistic and pvalue

For our example we’re only interested in the first, and completely ignore the pvalue.

First, we create a new array, and because the values are so random, we pick a specific slice as the comparison data. That way at least we should get exactly one slice with a high statistic.

I’m using a slightly modified version straight out of the numpy.vectorize API reference.

The signature parameter describes how the function expects and returns values. In this case it expects two 1-d arrays with the same lenth, and returns a single scalar.

def my_pearson(x, y):
    return stats.pearsonr(x, y).statistic

my_pearson_v = np.vectorize(
    my_pearson,
    signature='(n), (n) -> ()'
)

The two different arrays are passed after each other. Note that the input_core_dims now needs to reference both core dimensions of the two input arrays.

small_da = create_dataarray(1, 2, 5)
comparison_da = small_da.isel(lat=0, lon=-1)
xr.apply_ufunc(
    my_pearson_v, 
    small_da, 
    comparison_da,
    input_core_dims=[('time',), ('time',)]
)
<xarray.DataArray (lat: 1, lon: 2)>
array([[-0.4356561,  1.       ]])
Coordinates:
  * lat      (lat) float64 -90.0
  * lon      (lon) float64 0.0 180.0

We see indeed perfect correlation for the last longitude.

Using dask for large datasets#

Normally we don’t run our correlation experiment on such small and easy-to-handle datasets. For larger datasets, we use dask for parallelisation.

Let’s create a larger dataset. This dataset contains almost 2 Gigabyte of uncompressed data, just enough to be managed on a normal computer like mine, but certainly in a range where parallelisation should show dividends.

large_da = create_dataarray(181, 360, 3650)
comparison_da = large_da.isel(lat=0, lon=-1)

Let’s first try this without dask to see some baseline.

%%time
o = xr.apply_ufunc(
    my_pearson_v,
    large_da,
    comparison_da,
    input_core_dims=[('time',), ('time',)],
)
CPU times: user 22.1 s, sys: 26 ms, total: 22.1 s
Wall time: 22.2 s

In order to use dask, we need to create a dask cluster. This is done by this call:

if not 'c' in locals():
    c = Client(n_workers=4, threads_per_worker=1, memory_limit='3.5GB')
c

Client

Client-ea22e8dc-9af7-11ee-9b39-52aaff7ca351

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

By rechunking, we turn the large_da data array into a dask array. Let’s check how long it takes to compute the correlation without parallelisation.

large_da = large_da.chunk({'lat': 91, 'lon': 90})
large_da
<xarray.DataArray (time: 3650, lat: 181, lon: 360)>
dask.array<xarray-<this-array>, shape=(3650, 181, 360), dtype=float64, chunksize=(3650, 91, 90), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2000-01-01 2000-01-02 ... 2009-12-28
  * lat      (lat) float64 -90.0 -89.0 -88.0 -87.0 -86.0 ... 87.0 88.0 89.0 90.0
  * lon      (lon) float64 -179.0 -178.0 -177.0 -176.0 ... 178.0 179.0 180.0
Attributes:
    name:     random

For dask, we need two new parameters:

  • dask="parallelized" to tell the system to actually use dask, and

  • output_dtypes=["float"] to tell it what to expect as a result.

%%time
r = xr.apply_ufunc(
    my_pearson_v,
    large_da,
    comparison_da,
    input_core_dims=[('time',), ('time',)],
    dask='parallelized',
    output_dtypes=['float']
)
CPU times: user 2.63 ms, sys: 1.41 ms, total: 4.04 ms
Wall time: 3.54 ms

Because of lazy loading, the cell above returned immediately. It hasn’t done any of the calculations yet, just stored what it’s supposed to to.

Only in the next cell, where we tell it to actually compute the output, will it take time.

%%time
p = r.compute()
p
CPU times: user 645 ms, sys: 953 ms, total: 1.6 s
Wall time: 11.3 s
<xarray.DataArray (lat: 181, lon: 360)>
array([[ 7.49156667e-03, -1.05611671e-02, -6.57951424e-03, ...,
        -2.13526211e-03, -6.70029486e-03,  1.00000000e+00],
       [ 1.45195946e-02,  3.92027344e-03,  1.36661369e-02, ...,
        -8.23021313e-03,  5.88698092e-03,  9.57717761e-03],
       [ 9.63523125e-03,  5.45873825e-04,  2.91139380e-03, ...,
         5.17681565e-03,  6.33934265e-03, -2.11432513e-03],
       ...,
       [ 2.99324695e-02,  2.14475632e-02, -2.38409887e-02, ...,
        -8.38419120e-04, -1.74913441e-02, -3.59145362e-02],
       [ 5.00181784e-03, -5.90834912e-03,  1.15881635e-02, ...,
         1.84094023e-02,  7.76888293e-03, -2.42987961e-02],
       [-3.11181858e-02,  3.26837453e-02, -1.08989192e-02, ...,
        -1.16345114e-02,  2.47818545e-02, -9.01637243e-03]])
Coordinates:
  * lat      (lat) float64 -90.0 -89.0 -88.0 -87.0 -86.0 ... 87.0 88.0 89.0 90.0
  * lon      (lon) float64 -179.0 -178.0 -177.0 -176.0 ... 178.0 179.0 180.0
# Ensure that the perfect correlation still exists 
# where we have extracted the actual comparison array
p.isel(lat=0, lon=-1).item()
0.9999999999999998

Dask’s Generalised Universal Functions#

dask.array has its own method gufunc similar to numpy.vectorize to create generalised universal functions that create functions optimised for dask parallelisation.

It is described in detail here. In addition to the parameters of numpy.vectorize we also give it two new parameters:

  • vectorize=True to tell it that the function still needs to be vectorised, and

  • output_dtypes=np.float64 similar to the call above: The type of the elements of the output arrays.

my_pearson_g = da.gufunc(
    my_pearson, 
    signature='(n),(n) -> ()',
    vectorize=True,
    output_dtypes=np.float64
)
r = xr.apply_ufunc(
    my_pearson_g,
    large_da,
    comparison_da,
    input_core_dims=[('time',), ('time',)],
    dask='allowed',
)
%%time
p2 = r.compute()
p2.isel(lat=0, lon=-1).item()
CPU times: user 511 ms, sys: 1.16 s, total: 1.67 s
Wall time: 8.2 s
0.9999999999999998

So even at this size of array we can see performance improvements. As the data gets larger, maybe even too large to be handled in memory, these universal functions are the difference between feasible and impossible.

Conclusion#

Xarray’s apply_ufunc method can seriously speed up calculations along dimensions in multi-dimensional xarray dataarrays.

This option is very powerful, but as always this comes with added complexity.

I hope this short document will help you get in the mindset of the function and help you master your universal functions.