ENH: Support online operations in windowing operations · Issue #41673 · pandas-dev/pandas (original) (raw)

Problem

Currently calculating the the windowing aggregation of several pandas objects requires concatenating them together and returning the entire result

pd.concat([df1, df2, df3, ...], axis=1).rolling(10).mean()

By holding the latest state of a windowing aggregation, it is possible to calculate the above more efficiently (and flexibly) in an online fashion, e.g.

roll = df1.rolling(10)
roll.mean()
roll.mean(update=df2)
roll.mean(update=df3)

Aiming to eventually target:

rolling/expanding/ewm
groupby.rolling/groupby.expanding/groupby.ewm

As windowing operations are adapting numba this would be a numba only feature.

Important API features

  1. Independent state sharing (e.g. an update provided to rolling mean should not affect the state of rolling variance)
  2. Independent numba engine configuration (e.g. a user should be able to have rolling mean call nogil=True while also having rolling variance call nogil=False)

Proposed API options (example with ExponentialMovingWindow)

  1. ExponentialMovingWindow automatically hold state of an aggregation call, and an update keyword exists on the aggregation methods to pass in updated data
In [1]: s = pd.Series([0, 1, 2, np.nan, 4])

In [2]: ewm = s.head(1).ewm(com=0.5)

# Prime the first resulting state
In [3]: ewm.mean(engine="numba")
Out[3]:
0    0.0
dtype: float64

In [4]: ewm.mean(engine="numba", update=s.iloc[1:4])
Out[4]:
1    0.750000
2    1.615385
3    1.615385
dtype: float64

In [5]: ewm.mean(engine="numba", update=s.tail(1))
Out[5]:
1    3.670213
dtype: float64

# Clear the latest ewm state
In [6]: ewm.reset_update()

# Re-prime the first resulting state
In [7]: ewm.mean(engine="numba")
Out[7]:
0    0.0
dtype: float64

In [8]: ewm.mean(engine="numba", update=s.tail(4))
Out[8]:
1    0.750000
2    1.615385
3    1.615385
4    3.670213
dtype: float64
  1. A new online method in ewm that makes a new OnlineExponentialMovingWindow object. The aggregations methods on this object then support an update keyword.
In [6]: s = pd.Series([0, 1, 2, np.nan, 4])

# This will raise if numba is not installed
In [7]: ewm_online = s.head(1).ewm(com=0.5).online()

In [8]: ewm_online
Out[8]: OnlineExponentialMovingWindow [com=0.5,min_periods=1,adjust=True,ignore_na=False,axis=0]

# Prime the first resulting state
In [10]: ewm_online.mean()
Out[10]:
0    0.0
dtype: float64

In [11]: ewm_online.mean(update=s.iloc[1:4])
Out[11]:
1    0.750000
2    1.615385
3    1.615385
dtype: float64

# Clear the latest ewm state
In [12]: ewm_online.reset()

# Re-prime the first resulting state
In [13]: ewm_online.mean()
Out[13]:
0    0.0
dtype: float64

In [14]: ewm_online.mean(update=s.tail(4))
Out[14]:
1    0.750000
2    1.615385
3    1.615385
4    3.670213
dtype: float64

Considered API Options

  1. Create a new object per windowing method + aggregation, but this would lead to too many individual classes
  2. Use streamz as an optional dependency for online calculations. However, streamz has a full API workflow adopted for this operation which may be difficult to reuse. Also, we would ideally want this to start as a numba accelerated operation cc @mrocklin

@pandas-dev/pandas-core