Task¶
In this study, we will explore scalable time-series forecasting in PySpark. We will build time-series models using Facebook Prophet. We will then train 500 time-series Prophet models in parallel with PySpark. The objective is to predict 3 months of item-level sales data at different store locations using 5 years of historical store-item sales data.
Data¶
- Date - Date of the sale data. There are no holiday effects or store closures.
- store - Store ID
- item - Item ID
- sales - Number of items sold at a particular store on a particular date.
from prophet import Prophet
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
Importing plotly failed. Interactive plots will not work.
# Import the CSV file
df = pd.read_csv('./data/demand-forecasting-kernels-only/train.csv')
# View the first 5 entries
df.head()
date | store | item | sales | |
---|---|---|---|---|
0 | 2013-01-01 | 1 | 1 | 13 |
1 | 2013-01-02 | 1 | 1 | 11 |
2 | 2013-01-03 | 1 | 1 | 14 |
3 | 2013-01-04 | 1 | 1 | 13 |
4 | 2013-01-05 | 1 | 1 | 10 |
# Describe the data frame
df.describe()
store | item | sales | |
---|---|---|---|
count | 913000.000000 | 913000.000000 | 913000.000000 |
mean | 5.500000 | 25.500000 | 52.250287 |
std | 2.872283 | 14.430878 | 28.801144 |
min | 1.000000 | 1.000000 | 0.000000 |
25% | 3.000000 | 13.000000 | 30.000000 |
50% | 5.500000 | 25.500000 | 47.000000 |
75% | 8.000000 | 38.000000 | 70.000000 |
max | 10.000000 | 50.000000 | 231.000000 |
# Convert the date column to datetime
df['date'] = pd.to_datetime(df['date'])
# Extract year from date
df['year'] = df['date'].dt.year
# Aggregate sales by store and date
df_daily_store = pd.DataFrame(df.groupby(['date','store']).sum()['sales']).unstack()
# Plot daily sales by store
plt.figure(figsize=(10,5))
plt.plot(df_daily_store)
plt.title('Daily Sales by Store', fontsize = 20)
plt.ylabel("Sales ($)", fontsize = 18)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)
plt.show()
# Aggregate sales by store and date
df_daily_item = pd.DataFrame(df.groupby(['date','item']).sum()['sales']).unstack()
# Plot daily sales by store
plt.figure(figsize=(10,5))
plt.plot(df_daily_item)
plt.title('Daily Sales by Item', fontsize = 20)
plt.ylabel("Sales ($)", fontsize = 18)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)
plt.show()
Forecasting with Prophet¶
Methodology: Forecasting with Prophet¶
We employ Prophet, an open-source time series forecasting library developed by Facebook, designed to handle time series data with strong seasonal effects and historical trend changes. Prophet is particularly well-suited for business time series such as sales, web traffic, or revenue data, which often exhibit multiple seasonality and holiday effects.
Additive Time Series Decomposition¶
Prophet models the time series using the following additive decomposition:
$$ y(t) = g(t) + s(t) + h(t) + \varepsilon_t $$
where:
- $g(t)$ is the trend function modeling non-periodic changes,
- $s(t)$ represents periodic seasonal effects (e.g., weekly, yearly),
- $h(t)$ represents effects of holidays or special events,
- $\varepsilon_t$ is the error term assumed to be normally distributed.
Trend Component¶
Prophet supports two types of trend models:
- Piecewise linear: captures abrupt changes in trends at specified changepoints.
- Logistic growth: incorporates capacity constraints, modeling saturation effects.
Changepoints can be automatically detected or manually specified, allowing the model to adapt to structural breaks.
Seasonality¶
Prophet models seasonality using a Fourier series:
$$ s(t) = \sum_{n=1}^{N} \left[a_n \cos\left(\frac{2\pi n t}{P}\right) + b_n \sin\left(\frac{2\pi n t}{P}\right)\right] $$
where $P$ is the seasonal period (e.g., 365.25 for yearly seasonality). Prophet allows multiple seasonalities (e.g., weekly and yearly) and custom seasonalities.
Holiday Effects¶
Prophet incorporates user-specified holidays or events, treating them as indicator variables with estimated effects. This is especially helpful in contexts where external events influence the target variable.
Uncertainty Intervals¶
The model includes uncertainty intervals derived from a Bayesian sampling procedure or from uncertainty in the trend and residuals. These intervals are provided as ( yhat_lower ) and ( yhat_upper ) in the forecast output.
Forecasting Procedure¶
- Data Preparation: Input data is formatted with
ds
(timestamp) andy
(value). - Model Fitting: Prophet is trained on historical data, estimating trend, seasonality, and holiday effects.
- Forecast Generation: Future timestamps are provided, and the model outputs point forecasts and intervals.
- Visualization: Forecasts and components are visualized for interpretability.
This approach provides a flexible and interpretable framework for time series forecasting, balancing accuracy and usability without extensive parameter tuning.
help(Prophet)
Help on class Prophet in module prophet.forecaster: class Prophet(builtins.object) | Prophet(growth='linear', changepoints=None, n_changepoints=25, changepoint_range=0.8, yearly_seasonality='auto', weekly_seasonality='auto', daily_seasonality='auto', holidays=None, seasonality_mode='additive', seasonality_prior_scale=10.0, holidays_prior_scale=10.0, changepoint_prior_scale=0.05, mcmc_samples=0, interval_width=0.8, uncertainty_samples=1000, stan_backend=None, scaling: str = 'absmax', holidays_mode=None) | | Prophet forecaster. | | Parameters | ---------- | growth: String 'linear', 'logistic' or 'flat' to specify a linear, logistic or | flat trend. | changepoints: List of dates at which to include potential changepoints. If | not specified, potential changepoints are selected automatically. | n_changepoints: Number of potential changepoints to include. Not used | if input `changepoints` is supplied. If `changepoints` is not supplied, | then n_changepoints potential changepoints are selected uniformly from | the first `changepoint_range` proportion of the history. | changepoint_range: Proportion of history in which trend changepoints will | be estimated. Defaults to 0.8 for the first 80%. Not used if | `changepoints` is specified. | yearly_seasonality: Fit yearly seasonality. | Can be 'auto', True, False, or a number of Fourier terms to generate. | weekly_seasonality: Fit weekly seasonality. | Can be 'auto', True, False, or a number of Fourier terms to generate. | daily_seasonality: Fit daily seasonality. | Can be 'auto', True, False, or a number of Fourier terms to generate. | holidays: pd.DataFrame with columns holiday (string) and ds (date type) | and optionally columns lower_window and upper_window which specify a | range of days around the date to be included as holidays. | lower_window=-2 will include 2 days prior to the date as holidays. Also | optionally can have a column prior_scale specifying the prior scale for | that holiday. | seasonality_mode: 'additive' (default) or 'multiplicative'. | seasonality_prior_scale: Parameter modulating the strength of the | seasonality model. Larger values allow the model to fit larger seasonal | fluctuations, smaller values dampen the seasonality. Can be specified | for individual seasonalities using add_seasonality. | holidays_prior_scale: Parameter modulating the strength of the holiday | components model, unless overridden in the holidays input. | changepoint_prior_scale: Parameter modulating the flexibility of the | automatic changepoint selection. Large values will allow many | changepoints, small values will allow few changepoints. | mcmc_samples: Integer, if greater than 0, will do full Bayesian inference | with the specified number of MCMC samples. If 0, will do MAP | estimation. | interval_width: Float, width of the uncertainty intervals provided | for the forecast. If mcmc_samples=0, this will be only the uncertainty | in the trend using the MAP estimate of the extrapolated generative | model. If mcmc.samples>0, this will be integrated over all model | parameters, which will include uncertainty in seasonality. | uncertainty_samples: Number of simulated draws used to estimate | uncertainty intervals. Settings this value to 0 or False will disable | uncertainty estimation and speed up the calculation. | stan_backend: str as defined in StanBackendEnum default: None - will try to | iterate over all available backends and find the working one | holidays_mode: 'additive' or 'multiplicative'. Defaults to seasonality_mode. | | Methods defined here: | | __init__(self, growth='linear', changepoints=None, n_changepoints=25, changepoint_range=0.8, yearly_seasonality='auto', weekly_seasonality='auto', daily_seasonality='auto', holidays=None, seasonality_mode='additive', seasonality_prior_scale=10.0, holidays_prior_scale=10.0, changepoint_prior_scale=0.05, mcmc_samples=0, interval_width=0.8, uncertainty_samples=1000, stan_backend=None, scaling: str = 'absmax', holidays_mode=None) | Initialize self. See help(type(self)) for accurate signature. | | add_country_holidays(self, country_name) | Add in built-in holidays for the specified country. | | These holidays will be included in addition to any specified on model | initialization. | | Holidays will be calculated for arbitrary date ranges in the history | and future. See the online documentation for the list of countries with | built-in holidays. | | Built-in country holidays can only be set for a single country. | | Parameters | ---------- | country_name: Name of the country, like 'UnitedStates' or 'US' | | Returns | ------- | The prophet object. | | add_group_component(self, components, name, group) | Adds a component with given name that contains all of the components | in group. | | Parameters | ---------- | components: Dataframe with components. | name: Name of new group component. | group: List of components that form the group. | | Returns | ------- | Dataframe with components. | | add_regressor(self, name, prior_scale=None, standardize='auto', mode=None) | Add an additional regressor to be used for fitting and predicting. | | The dataframe passed to `fit` and `predict` will have a column with the | specified name to be used as a regressor. When standardize='auto', the | regressor will be standardized unless it is binary. The regression | coefficient is given a prior with the specified scale parameter. | Decreasing the prior scale will add additional regularization. If no | prior scale is provided, self.holidays_prior_scale will be used. | Mode can be specified as either 'additive' or 'multiplicative'. If not | specified, self.seasonality_mode will be used. 'additive' means the | effect of the regressor will be added to the trend, 'multiplicative' | means it will multiply the trend. | | Parameters | ---------- | name: string name of the regressor. | prior_scale: optional float scale for the normal prior. If not | provided, self.holidays_prior_scale will be used. | standardize: optional, specify whether this regressor will be | standardized prior to fitting. Can be 'auto' (standardize if not | binary), True, or False. | mode: optional, 'additive' or 'multiplicative'. Defaults to | self.seasonality_mode. | | Returns | ------- | The prophet object. | | add_seasonality(self, name, period, fourier_order, prior_scale=None, mode=None, condition_name=None) | Add a seasonal component with specified period, number of Fourier | components, and prior scale. | | Increasing the number of Fourier components allows the seasonality to | change more quickly (at risk of overfitting). Default values for yearly | and weekly seasonalities are 10 and 3 respectively. | | Increasing prior scale will allow this seasonality component more | flexibility, decreasing will dampen it. If not provided, will use the | seasonality_prior_scale provided on Prophet initialization (defaults | to 10). | | Mode can be specified as either 'additive' or 'multiplicative'. If not | specified, self.seasonality_mode will be used (defaults to additive). | Additive means the seasonality will be added to the trend, | multiplicative means it will multiply the trend. | | If condition_name is provided, the dataframe passed to `fit` and | `predict` should have a column with the specified condition_name | containing booleans which decides when to apply seasonality. | | Parameters | ---------- | name: string name of the seasonality component. | period: float number of days in one period. | fourier_order: int number of Fourier components to use. | prior_scale: optional float prior scale for this component. | mode: optional 'additive' or 'multiplicative' | condition_name: string name of the seasonality condition. | | Returns | ------- | The prophet object. | | calculate_initial_params(self, num_total_regressors: int) -> prophet.models.ModelParams | Calculates initial parameters for the model based on the preprocessed history. | | Parameters | ---------- | num_total_regressors: the count of seasonality fourier components plus holidays plus extra regressors. | | construct_holiday_dataframe(self, dates) | Construct a dataframe of holiday dates. | | Will combine self.holidays with the built-in country holidays | corresponding to input dates, if self.country_holidays is set. | | Parameters | ---------- | dates: pd.Series containing timestamps used for computing seasonality. | | Returns | ------- | dataframe of holiday dates, in holiday dataframe format used in | initialization. | | fit(self, df, **kwargs) | Fit the Prophet model. | | This sets self.params to contain the fitted model parameters. It is a | dictionary parameter names as keys and the following items: | k (Mx1 array): M posterior samples of the initial slope. | m (Mx1 array): The initial intercept. | delta (MxN array): The slope change at each of N changepoints. | beta (MxK matrix): Coefficients for K seasonality features. | sigma_obs (Mx1 array): Noise level. | Note that M=1 if MAP estimation. | | Parameters | ---------- | df: pd.DataFrame containing the history. Must have columns ds (date | type) and y, the time series. If self.growth is 'logistic', then | df must also have a column cap that specifies the capacity at | each ds. | kwargs: Additional arguments passed to the optimizing or sampling | functions in Stan. | | Returns | ------- | The fitted Prophet object. | | initialize_scales(self, initialize_scales, df) | Initialize model scales. | | Sets model scaling factors using df. | | Parameters | ---------- | initialize_scales: Boolean set the scales or not. | df: pd.DataFrame for setting scales. | | make_all_seasonality_features(self, df) | Dataframe with seasonality features. | | Includes seasonality features, holiday features, and added regressors. | | Parameters | ---------- | df: pd.DataFrame with dates for computing seasonality features and any | added regressors. | | Returns | ------- | pd.DataFrame with regression features. | list of prior scales for each column of the features dataframe. | Dataframe with indicators for which regression components correspond to | which columns. | Dictionary with keys 'additive' and 'multiplicative' listing the | component names for each mode of seasonality. | | make_future_dataframe(self, periods, freq='D', include_history=True) | Simulate the trend using the extrapolated generative model. | | Parameters | ---------- | periods: Int number of periods to forecast forward. | freq: Any valid frequency for pd.date_range, such as 'D' or 'M'. | include_history: Boolean to include the historical dates in the data | frame for predictions. | | Returns | ------- | pd.Dataframe that extends forward from the end of self.history for the | requested number of periods. | | make_holiday_features(self, dates, holidays) | Construct a dataframe of holiday features. | | Parameters | ---------- | dates: pd.Series containing timestamps used for computing seasonality. | holidays: pd.Dataframe containing holidays, as returned by | construct_holiday_dataframe. | | Returns | ------- | holiday_features: pd.DataFrame with a column for each holiday. | prior_scale_list: List of prior scales for each holiday column. | holiday_names: List of names of holidays | | parse_seasonality_args(self, name, arg, auto_disable, default_order) | Get number of fourier components for built-in seasonalities. | | Parameters | ---------- | name: string name of the seasonality component. | arg: 'auto', True, False, or number of fourier components as provided. | auto_disable: bool if seasonality should be disabled when 'auto'. | default_order: int default fourier order | | Returns | ------- | Number of fourier components, or 0 for disabled. | | percentile(self, a, *args, **kwargs) | We rely on np.nanpercentile in the rare instances where there | are a small number of bad samples with MCMC that contain NaNs. | However, since np.nanpercentile is far slower than np.percentile, | we only fall back to it if the array contains NaNs. See | https://github.com/facebook/prophet/issues/1310 for more details. | | plot(self, fcst, ax=None, uncertainty=True, plot_cap=True, xlabel='ds', ylabel='y', figsize=(10, 6), include_legend=False) | Plot the Prophet forecast. | | Parameters | ---------- | fcst: pd.DataFrame output of self.predict. | ax: Optional matplotlib axes on which to plot. | uncertainty: Optional boolean to plot uncertainty intervals. | plot_cap: Optional boolean indicating if the capacity should be shown | in the figure, if available. | xlabel: Optional label name on X-axis | ylabel: Optional label name on Y-axis | figsize: Optional tuple width, height in inches. | include_legend: Optional boolean to add legend to the plot. | | Returns | ------- | A matplotlib figure. | | plot_components(self, fcst, uncertainty=True, plot_cap=True, weekly_start=0, yearly_start=0, figsize=None) | Plot the Prophet forecast components. | | Will plot whichever are available of: trend, holidays, weekly | seasonality, and yearly seasonality. | | Parameters | ---------- | fcst: pd.DataFrame output of self.predict. | uncertainty: Optional boolean to plot uncertainty intervals. | plot_cap: Optional boolean indicating if the capacity should be shown | in the figure, if available. | weekly_start: Optional int specifying the start day of the weekly | seasonality plot. 0 (default) starts the week on Sunday. 1 shifts | by 1 day to Monday, and so on. | yearly_start: Optional int specifying the start day of the yearly | seasonality plot. 0 (default) starts the year on Jan 1. 1 shifts | by 1 day to Jan 2, and so on. | figsize: Optional tuple width, height in inches. | | Returns | ------- | A matplotlib figure. | | predict(self, df: pandas.core.frame.DataFrame = None, vectorized: bool = True) -> pandas.core.frame.DataFrame | Predict using the prophet model. | | Parameters | ---------- | df: pd.DataFrame with dates for predictions (column ds), and capacity | (column cap) if logistic growth. If not provided, predictions are | made on the history. | vectorized: Whether to use a vectorized method to compute uncertainty intervals. Suggest using | True (the default) for much faster runtimes in most cases, | except when (growth = 'logistic' and mcmc_samples > 0). | | Returns | ------- | A pd.DataFrame with the forecast components. | | predict_seasonal_components(self, df) | Predict seasonality components, holidays, and added regressors. | | Parameters | ---------- | df: Prediction dataframe. | | Returns | ------- | Dataframe with seasonal components. | | predict_trend(self, df) | Predict trend using the prophet model. | | Parameters | ---------- | df: Prediction dataframe. | | Returns | ------- | Vector with trend on prediction dates. | | predict_uncertainty(self, df: pandas.core.frame.DataFrame, vectorized: bool) -> pandas.core.frame.DataFrame | Prediction intervals for yhat and trend. | | Parameters | ---------- | df: Prediction dataframe. | vectorized: Whether to use a vectorized method for generating future draws. | | Returns | ------- | Dataframe with uncertainty intervals. | | predictive_samples(self, df: pandas.core.frame.DataFrame, vectorized: bool = True) | Sample from the posterior predictive distribution. Returns samples | for the main estimate yhat, and for the trend component. The shape of | each output will be (nforecast x nsamples), where nforecast is the | number of points being forecasted (the number of rows in the input | dataframe) and nsamples is the number of posterior samples drawn. | This is the argument `uncertainty_samples` in the Prophet constructor, | which defaults to 1000. | | Parameters | ---------- | df: Dataframe with dates for predictions (column ds), and capacity | (column cap) if logistic growth. | vectorized: Whether to use a vectorized method to compute possible draws. Suggest using | True (the default) for much faster runtimes in most cases, | except when (growth = 'logistic' and mcmc_samples > 0). | | Returns | ------- | Dictionary with keys "trend" and "yhat" containing | posterior predictive samples for that component. | | preprocess(self, df: pandas.core.frame.DataFrame, **kwargs) -> prophet.models.ModelInputData | Reformats historical data, standardizes y and extra regressors, sets seasonalities and changepoints. | | Saves the preprocessed data to the instantiated object, and also returns the relevant components | as a ModelInputData object. | | regressor_column_matrix(self, seasonal_features, modes) | Dataframe indicating which columns of the feature matrix correspond | to which seasonality/regressor components. | | Includes combination components, like 'additive_terms'. These | combination components will be added to the 'modes' input. | | Parameters | ---------- | seasonal_features: Constructed seasonal features dataframe | modes: Dictionary with keys 'additive' and 'multiplicative' listing the | component names for each mode of seasonality. | | Returns | ------- | component_cols: A binary indicator dataframe with columns seasonal | components and rows columns in seasonal_features. Entry is 1 if | that columns is used in that component. | modes: Updated input with combination components. | | sample_model(self, df, seasonal_features, iteration, s_a, s_m) -> Dict[str, numpy.ndarray] | Simulate observations from the extrapolated generative model. | | Parameters | ---------- | df: Prediction dataframe. | seasonal_features: pd.DataFrame of seasonal features. | iteration: Int sampling iteration to use parameters from. | s_a: Indicator vector for additive components | s_m: Indicator vector for multiplicative components | | Returns | ------- | Dictionary with `yhat` and `trend`, each like df['t']. | | sample_model_vectorized(self, df: pandas.core.frame.DataFrame, seasonal_features: pandas.core.frame.DataFrame, iteration: int, s_a: numpy.ndarray, s_m: numpy.ndarray, n_samples: int) -> List[Dict[str, numpy.ndarray]] | Simulate observations from the extrapolated generative model. Vectorized version of sample_model(). | | Returns | ------- | List (length n_samples) of dictionaries with arrays for trend and yhat, each ordered like df['t']. | | sample_posterior_predictive(self, df: pandas.core.frame.DataFrame, vectorized: bool) -> Dict[str, numpy.ndarray] | Prophet posterior predictive samples. | | Parameters | ---------- | df: Prediction dataframe. | vectorized: Whether to use a vectorized method to generate future draws. | | Returns | ------- | Dictionary with posterior predictive samples for the forecast yhat and | for the trend component. | | sample_predictive_trend(self, df, iteration) | Simulate the trend using the extrapolated generative model. | | Parameters | ---------- | df: Prediction dataframe. | iteration: Int sampling iteration to use parameters from. | | Returns | ------- | np.array of simulated trend over df['t']. | | sample_predictive_trend_vectorized(self, df: pandas.core.frame.DataFrame, n_samples: int, iteration: int = 0) -> numpy.ndarray | Sample draws of the future trend values. Vectorized version of sample_predictive_trend(). | | Returns | ------- | Draws of the trend values with shape (n_samples, len(df)). Values are on the scale of the original data. | | set_auto_seasonalities(self) | Set seasonalities that were left on auto. | | Turns on yearly seasonality if there is >=2 years of history. | Turns on weekly seasonality if there is >=2 weeks of history, and the | spacing between dates in the history is <7 days. | Turns on daily seasonality if there is >=2 days of history, and the | spacing between dates in the history is <1 day. | | set_changepoints(self) | Set changepoints | | Sets m$changepoints to the dates of changepoints. Either: | 1) The changepoints were passed in explicitly. | A) They are empty. | B) They are not empty, and need validation. | 2) We are generating a grid of them. | 3) The user prefers no changepoints be used. | | setup_dataframe(self, df, initialize_scales=False) | Prepare dataframe for fitting or predicting. | | Adds a time index and scales y. Creates auxiliary columns 't', 't_ix', | 'y_scaled', and 'cap_scaled'. These columns are used during both | fitting and predicting. | | Parameters | ---------- | df: pd.DataFrame with columns ds, y, and cap if logistic growth. Any | specified additional regressors must also be present. | initialize_scales: Boolean set scaling factors in self from df. | | Returns | ------- | pd.DataFrame prepared for fitting or predicting. | | validate_column_name(self, name, check_holidays=True, check_seasonalities=True, check_regressors=True) | Validates the name of a seasonality, holiday, or regressor. | | Parameters | ---------- | name: string | check_holidays: bool check if name already used for holiday | check_seasonalities: bool check if name already used for seasonality | check_regressors: bool check if name already used for regressor | | validate_inputs(self) | Validates the inputs to Prophet. | | ---------------------------------------------------------------------- | Class methods defined here: | | make_seasonality_features(dates, period, series_order, prefix) | Data frame with seasonality features. | | Parameters | ---------- | cls: Prophet class. | dates: pd.Series containing timestamps. | period: Number of days of the period. | series_order: Number of components. | prefix: Column name prefix. | | Returns | ------- | pd.DataFrame with seasonality features. | | ---------------------------------------------------------------------- | Static methods defined here: | | flat_growth_init(df) | Initialize flat growth. | | Provides a strong initialization for flat growth. Sets the growth to 0 | and offset parameter as mean of history y_scaled values. | | Parameters | ---------- | df: pd.DataFrame with columns ds (date), y_scaled (scaled time series), | and t (scaled time). | | Returns | ------- | A tuple (k, m) with the rate (k) and offset (m) of the linear growth | function. | | flat_trend(t, m) | Evaluate the flat trend function. | | Parameters | ---------- | t: np.array of times on which the function is evaluated. | m: Float initial offset. | | Returns | ------- | Vector y(t). | | fourier_series(dates: pandas.core.series.Series, period: Union[int, float], series_order: int) -> numpy.ndarray[tuple[int, ...], numpy.dtype[numpy.float64]] | Provides Fourier series components with the specified frequency | and order. | | Parameters | ---------- | dates: pd.Series containing timestamps. | period: Number of days of the period. | series_order: Number of components. | | Returns | ------- | Matrix with seasonality features. | | linear_growth_init(df) | Initialize linear growth. | | Provides a strong initialization for linear growth by calculating the | growth and offset parameters that pass the function through the first | and last points in the time series. | | Parameters | ---------- | df: pd.DataFrame with columns ds (date), y_scaled (scaled time series), | and t (scaled time). | | Returns | ------- | A tuple (k, m) with the rate (k) and offset (m) of the linear growth | function. | | logistic_growth_init(df) | Initialize logistic growth. | | Provides a strong initialization for logistic growth by calculating the | growth and offset parameters that pass the function through the first | and last points in the time series. | | Parameters | ---------- | df: pd.DataFrame with columns ds (date), cap_scaled (scaled capacity), | y_scaled (scaled time series), and t (scaled time). | | Returns | ------- | A tuple (k, m) with the rate (k) and offset (m) of the logistic growth | function. | | piecewise_linear(t, deltas, k, m, changepoint_ts) | Evaluate the piecewise linear function. | | Parameters | ---------- | t: np.array of times on which the function is evaluated. | deltas: np.array of rate changes at each changepoint. | k: Float initial rate. | m: Float initial offset. | changepoint_ts: np.array of changepoint times. | | Returns | ------- | Vector y(t). | | piecewise_logistic(t, cap, deltas, k, m, changepoint_ts) | Evaluate the piecewise logistic function. | | Parameters | ---------- | t: np.array of times on which the function is evaluated. | cap: np.array of capacities at each t. | deltas: np.array of rate changes at each changepoint. | k: Float initial rate. | m: Float initial offset. | changepoint_ts: np.array of changepoint times. | | Returns | ------- | Vector y(t). | | ---------------------------------------------------------------------- | Data descriptors defined here: | | __dict__ | dictionary for instance variables | | __weakref__ | list of weak references to the object
# Prophet
# Store 1 item 1 data
s1i1 = df[(df.item==1)&(df.store==1)]
# Select columns
s1i1 = s1i1[["date", "sales"]]
# Prophet wants us to name columns as shown below
s1i1.columns = ["ds", "y"]
# instantiate the model and set parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model to historical data
model.fit(s1i1);
21:26:34 - cmdstanpy - INFO - Chain [1] start processing 21:26:34 - cmdstanpy - INFO - Chain [1] done processing
# Create a data frame that lists 90 dates starting from Jan 1 2018
future = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True)
# Out of sample prediction
future = model.predict(future)
# Plot
fig = model.plot(future, figsize=(10,5))
ax = fig.gca()
ax.set_title("Out-of-Sample Forecast", size=20)
ax.set_xlabel("Date", size=18)
ax.set_ylabel("Sales", size=18)
ax.tick_params(axis='x', rotation=45, labelsize=15)
ax.set_xlim(pd.to_datetime(['2017-01-01', '2018-03-31']))
plt.show();
Let's use spark to parallize forecasting¶
import findspark
findspark.init('/opt/apps/SPARK3/spark-current')
# Then you could import the `pyspark` module
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Forecasting") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memory", "14g") \
.config("spark.num.executors", "4") \
.getOrCreate()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/05/19 17:58:41 WARN [Thread-6] Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 25/05/19 17:58:41 WARN [Thread-6] Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 25/05/19 17:58:42 WARN [Thread-6] Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
import pandas as pd
# Import the csv file and explore it
sales_pd = pd.read_csv('./data/demand-forecasting-kernels-only/train.csv')
# Convert ds to datetime
sales_pd['date'] = pd.to_datetime(sales_pd['date'])
# Display info
sales_pd.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 913000 entries, 0 to 912999 Data columns (total 4 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 date 913000 non-null datetime64[ns] 1 store 913000 non-null int64 2 item 913000 non-null int64 3 sales 913000 non-null int64 dtypes: datetime64[ns](1), int64(3) memory usage: 27.9 MB
# Create a larger data frame
sales_pd_10k = pd.DataFrame()
for i in range(0,10):
temp_pd = sales_pd.copy()
ip1 = i + 1
temp_pd['store'] = temp_pd['store'] + (10 * i)
sales_pd_10k = pd.concat([sales_pd_10k, temp_pd])
print('added data frame', ip1)
added data frame 1 added data frame 2 added data frame 3 added data frame 4 added data frame 5 added data frame 6 added data frame 7 added data frame 8 added data frame 9 added data frame 10
# Check the number of unique groups
sales_pd_10k[['item', 'store']].nunique()
item 50 store 100 dtype: int64
# Read the csv file
sales_df = spark.createDataFrame(sales_pd_10k)
# Display the schema
sales_df.printSchema()
root |-- date: timestamp (nullable = true) |-- store: long (nullable = true) |-- item: long (nullable = true) |-- sales: long (nullable = true)
sales_df = sales_df.withColumnRenamed("date", "ds").withColumnRenamed("sales", "y")
# Partition the data
sales_df.createOrReplaceTempView("item_sales")
sql = "select * from item_sales"
sales_part = (spark.sql(sql)\
.repartition(spark.sparkContext.defaultParallelism,
['store', 'item'])).cache()
sales_part.explain()
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- InMemoryTableScan [ds#752, store#1L, item#2L, y#757L] +- InMemoryRelation [ds#752, store#1L, item#2L, y#757L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- Exchange hashpartitioning(store#1L, item#2L, 8), REPARTITION_BY_NUM, [plan_id=268] +- Project [date#0 AS ds#752, store#1L, item#2L, sales#3L AS y#757L] +- Scan ExistingRDD[date#0,store#1L,item#2L,sales#3L]
# Define a schema
from pyspark.sql.types import StructType, StructField, DoubleType, FloatType, IntegerType, TimestampType
schema = StructType([
StructField('store', IntegerType()),
StructField('item', IntegerType()),
StructField('ds', TimestampType()),
StructField('y', FloatType()),
StructField('yhat', DoubleType()),
StructField('yhat_upper', DoubleType()),
StructField('yhat_lower', DoubleType()),
])
from pyspark.sql.functions import pandas_udf, PandasUDFType
# define the Pandas UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(store_pd):
# instantiate the model and set parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model to historical data
model.fit(store_pd)
# Create a data frame that lists 90 dates starting from Jan 1 2018
future = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True)
# Out of sample prediction
future = model.predict(future)
# Create a data frame that contains store, item, y, and yhat
f_pd = future[['ds', 'yhat', 'yhat_upper', 'yhat_lower']]
st_pd = store_pd[['ds', 'store', 'item', 'y']]
result_pd = f_pd.join(st_pd.set_index('ds'), on='ds', how='left')
# fill store and item
result_pd['store'] = store_pd['store'].iloc[0]
result_pd['item'] = store_pd['item'].iloc[0]
#result_pd['store'] = store_pd['store'].fillna(method='ffill')
#result_pd['item'] = store_pd['item'].fillna(method='ffill')
return result_pd[['store', 'item', 'ds', 'y', 'yhat',
'yhat_upper', 'yhat_lower']]
from prophet import Prophet
# Apply the function to all store-items
results = sales_part.groupby(['store', 'item']).apply(apply_model)
# Print the results - calculate the time to run
import timeit
start = timeit.default_timer()
results.show()
stop = timeit.default_timer()
# Print the time it took to forecast 500 models
print('Time: ', stop - start)
25/05/19 18:38:51 WARN [dispatcher-CoarseGrainedScheduler] TaskSetManager: Stage 13 contains a task of very large size (20091 KiB). The maximum recommended task size is 1000 KiB.
+-----+----+-------------------+----+------------------+------------------+-------------------+ |store|item| ds| y| yhat| yhat_upper| yhat_lower| +-----+----+-------------------+----+------------------+------------------+-------------------+ | 1| 9|2013-01-01 00:00:00|18.0|-3.607896305397453|14.750340069852319|-24.286379175981143| | 1| 9|2013-01-02 00:00:00|23.0| 9.197956489899948|28.894728349753557|-11.102056787297881| | 1| 9|2013-01-03 00:00:00|25.0|0.6964563505207579|20.239050758031148|-19.173274241327878| | 1| 9|2013-01-04 00:00:00|22.0| 7.657071034495528|26.429187163228722| -11.59913880373706| | 1| 9|2013-01-05 00:00:00|29.0|14.581207709029172| 34.5988033088096| -5.085065610858691| | 1| 9|2013-01-06 00:00:00|35.0|18.033210853244938| 39.33289301040418|-1.7557855759833572| | 1| 9|2013-01-07 00:00:00|24.0| 8.419122271729004|29.581678294382268|-11.313345323624675| | 1| 9|2013-01-08 00:00:00|22.0| 6.623896742088943|24.461133289004323|-12.747719230449235| | 1| 9|2013-01-09 00:00:00|19.0|19.954926624356546|39.057599977479605| 1.0516450647791848| | 1| 9|2013-01-10 00:00:00|30.0| 10.88069617701435|31.157738793358906| -9.28365313474917| | 1| 9|2013-01-11 00:00:00|20.0| 17.73559274997684| 37.42949891123449| -1.322218511608456| | 1| 9|2013-01-12 00:00:00|38.0| 24.36210134772759| 44.37117536210255| 5.469068905327186| | 1| 9|2013-01-13 00:00:00|32.0| 27.18815454501502|45.876863474827026| 8.394169515742707| | 1| 9|2013-01-14 00:00:00|24.0|16.235882394122264| 36.71888977631134|-4.7508029022662654| | 1| 9|2013-01-15 00:00:00|24.0|13.274398278041616|31.903686620615517| -6.65221028828473| | 1| 9|2013-01-16 00:00:00|22.0| 25.92289397128175| 45.28310530349508| 5.695235228483467| | 1| 9|2013-01-17 00:00:00|33.0|15.133599371122099| 36.18001501941159|-3.9767735457222284| | 1| 9|2013-01-18 00:00:00|24.0|20.832291056892476|39.441860567342644| 2.797502819607014| | 1| 9|2013-01-19 00:00:00|23.0|26.227287312779232|45.991121883892774| 6.548593856776069| | 1| 9|2013-01-20 00:00:00|34.0| 27.63251410425226| 46.8624270261929| 9.198575683120033| +-----+----+-------------------+----+------------------+------------------+-------------------+ only showing top 20 rows Time: 9.298863128293306
Lab¶
- Use another univariate time series forecasting model.
- Calculate average forecasting accuracy.
- Parallize it in pyspark or sparklyr.