Bayesian parameter estimation for an LG-SSM using HMC#

We show how to use the blackjax libray to compute the parameter posterior \(p(\theta|y(1:T))\) for an LGSSM model. We use the Kalman filter to compute the marginal likelihood, \(p(y(1:T) | \theta) = \int_{z(1:T)} p(z(1:T), y(1:T)|\theta)\).

Setup#

%%capture
try:
    import dynamax
except ModuleNotFoundError:
    print('installing dynamax')
    %pip install -q dynamax[notebooks]
    import dynamax
try:
    import blackjax
except ModuleNotFoundError:
    print('installing blackjax')
    %pip install -qq blackjax
    import blackjax
from jax import random as  jr
from jax import numpy as jnp
from jax import jit, vmap
from itertools import count

from dynamax.linear_gaussian_ssm import LinearGaussianSSM
from dynamax.parameters import log_det_jac_constrain

import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['figure.figsize'] = [16, 9]

Generate synthetic training data#

# Simulate synthetic data from true model
state_dim = 2
emission_dim = 10
num_timesteps = 100
keys = map(jr.PRNGKey, count())

true_model = LinearGaussianSSM(state_dim, emission_dim)
true_params, _ = true_model.initialize(next(keys))
true_states, emissions = true_model.sample(true_params, next(keys), num_timesteps)
def plot_results(emissions, smoothed_emissions, smoothed_emissions_std):
    # all arrays are (T, E) dimensional, T=ntime, E=emission_dim
    spc = 3
    plt.figure(figsize=(10, 4))
    for i in range(emission_dim):
        plt.plot(emissions[:, i] + spc * i, "--k", label="observed" if i == 0 else None)
        ln = plt.plot(smoothed_emissions[:, i] + spc * i,
                    label="smoothed" if i == 0 else None)[0]
        plt.fill_between(
            jnp.arange(num_timesteps),
            spc * i + smoothed_emissions[:, i] - 2 * smoothed_emissions_std[:, i],
            spc * i + smoothed_emissions[:, i] + 2 * smoothed_emissions_std[:, i],
            color=ln.get_color(),
            alpha=0.25,
        )
    plt.xlabel("time")
    plt.xlim(0, num_timesteps - 1)
    plt.ylabel("true and predicted emissions")
    plt.legend()
plot_results(emissions, emissions, 0.1*jnp.ones_like(emissions)) # fake posterior variance
../../_images/6b23cfeb8c298f01b0a0f98bd509670f21363d07cddd549e25b131758f458b58.png

Baseline method: use EM to compute MLE#

# Initialize parameters by fitting EM algorithm
num_iters = 100
test_model = LinearGaussianSSM(state_dim, emission_dim)
initial_params, param_props = test_model.initialize(next(keys))
fitted_params, marginal_lls = test_model.fit_em(initial_params, param_props, emissions, num_iters=num_iters)

# Extract fitted params
fitted_C = fitted_params.emissions.weights
fitted_d = fitted_params.emissions.bias
fitted_R = fitted_params.emissions.cov

# Compute predicted emissions
posterior = test_model.smoother(fitted_params, emissions)
smoothed_emissions_mean = posterior.smoothed_means @ fitted_C.T + fitted_d
smoothed_emissions_cov = fitted_C @ posterior.smoothed_covariances @ fitted_C.T + fitted_R
smoothed_emissions_std = jnp.sqrt(
    jnp.array([smoothed_emissions_cov[:, i, i] for i in range(emission_dim)])).T # (T,E)
100.00% [100/100 00:00<00:00]
print([emissions.shape, smoothed_emissions_mean.shape, smoothed_emissions_std.shape])
plot_results(emissions, smoothed_emissions_mean, smoothed_emissions_std)
[(100, 10), (100, 10), (100, 10)]
../../_images/59278162fcff8a9cac6f9d77d0833634f74a33fadd3e5be996d6607b49de8a58.png

Implement HMC wrapper#

from dynamax.parameters import to_unconstrained, from_unconstrained
from dynamax.utils.utils import pytree_stack, ensure_array_has_batch_dim
from functools import partial
from fastprogress.fastprogress import progress_bar

def fit_hmc(model,
            initial_params,
            props,
            key,
            num_samples,
            emissions,
            inputs=None,
            warmup_steps=100,
            num_integration_steps=30,
            verbose=True):
    """Sample parameters of the model using HMC."""
    # Make sure the emissions and inputs have batch dimensions
    batch_emissions = ensure_array_has_batch_dim(emissions, model.emission_shape)
    batch_inputs = ensure_array_has_batch_dim(inputs, model.inputs_shape)

    initial_unc_params = to_unconstrained(initial_params, props)

    # The log likelihood that the HMC samples from
    def _logprob(unc_params):
        params = from_unconstrained(unc_params, props)
        batch_lls = vmap(partial(model.marginal_log_prob, params))(batch_emissions, batch_inputs)
        lp = model.log_prior(params) + batch_lls.sum()
        lp += log_det_jac_constrain(params, props)
        return lp

    # Initialize the HMC sampler using window_adaptation
    warmup = blackjax.window_adaptation(blackjax.hmc,
                                        _logprob,
                                        num_steps=warmup_steps,
                                        num_integration_steps=num_integration_steps,
                                        progress_bar=verbose)
    init_key, key = jr.split(key)
    hmc_initial_state, hmc_kernel, _ = warmup.run(init_key, initial_unc_params)

    @jit
    def hmc_step(hmc_state, step_key):
        next_hmc_state, _ = hmc_kernel(step_key, hmc_state)
        params = from_unconstrained(hmc_state.position, props)
        return next_hmc_state, params

    # Start sampling
    log_probs = []
    samples = []
    hmc_state = hmc_initial_state
    pbar = progress_bar(range(num_samples)) if verbose else range(num_samples)
    for _ in pbar:
        step_key, key = jr.split(key)
        hmc_state, params = hmc_step(hmc_state, step_key)
        log_probs.append(-hmc_state.potential_energy)
        samples.append(params)

    # Combine the samples into a single pytree
    return pytree_stack(samples), jnp.array(log_probs)

Call HMC#

sample_size = 500
param_samples, lps = fit_hmc(test_model, initial_params, param_props, next(keys), sample_size, emissions, num_integration_steps=30)
Running window adaptation
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[10], line 2
      1 sample_size = 500
----> 2 param_samples, lps = fit_hmc(test_model, initial_params, param_props, next(keys), sample_size, emissions, num_integration_steps=30)

Cell In[9], line 38, in fit_hmc(model, initial_params, props, key, num_samples, emissions, inputs, warmup_steps, num_integration_steps, verbose)
     32 warmup = blackjax.window_adaptation(blackjax.hmc,
     33                                     _logprob,
     34                                     num_steps=warmup_steps,
     35                                     num_integration_steps=num_integration_steps,
     36                                     progress_bar=verbose)
     37 init_key, key = jr.split(key)
---> 38 hmc_initial_state, hmc_kernel, _ = warmup.run(init_key, initial_unc_params)
     40 @jit
     41 def hmc_step(hmc_state, step_key):
     42     next_hmc_state, _ = hmc_kernel(step_key, hmc_state)

File /opt/hostedtoolcache/Python/3.9.18/x64/lib/python3.9/site-packages/blackjax/adaptation/window_adaptation.py:335, in window_adaptation.<locals>.run(rng_key, position, num_steps)
    333 keys = jax.random.split(rng_key, num_steps)
    334 schedule = build_schedule(num_steps)
--> 335 last_state, info = jax.lax.scan(
    336     one_step_,
    337     (init_state, init_adaptation_state),
    338     (jnp.arange(num_steps), keys, schedule),
    339 )
    340 last_chain_state, last_warmup_state, *_ = last_state
    342 step_size, inverse_mass_matrix = adapt_final(last_warmup_state)

    [... skipping hidden 21 frame]

File /opt/hostedtoolcache/Python/3.9.18/x64/lib/python3.9/site-packages/blackjax/progress_bar.py:97, in progress_bar_scan.<locals>._progress_bar_scan.<locals>.wrapper_progress_bar(carry, x)
     95     iter_num = x
     96 _update_progress_bar(iter_num)
---> 97 result = func(carry, x)
     98 return close_bar(result, iter_num)

File /opt/hostedtoolcache/Python/3.9.18/x64/lib/python3.9/site-packages/blackjax/adaptation/window_adaptation.py:303, in window_adaptation.<locals>.one_step(carry, xs)
    300 _, rng_key, adaptation_stage = xs
    301 state, adaptation_state = carry
--> 303 new_state, info = mcmc_kernel(
    304     rng_key,
    305     state,
    306     logdensity_fn,
    307     adaptation_state.step_size,
    308     adaptation_state.inverse_mass_matrix,
    309     **extra_parameters,
    310 )
    311 new_adaptation_state = adapt_step(
    312     adaptation_state,
    313     adaptation_stage,
    314     new_state.position,
    315     info.acceptance_rate,
    316 )
    318 return (
    319     (new_state, new_adaptation_state),
    320     AdaptationInfo(new_state, info, new_adaptation_state),
    321 )

TypeError: kernel() got an unexpected keyword argument 'num_steps'
plt.plot(lps)
plt.xlabel("iteration")
plt.xlabel("log probability")
Text(0.5, 0, 'log probability')
../../_images/867fb9253f2d2bd9544ec24b563d51e6dac91cc881bdc94e9f4f6259c5b48e84.png
@jit
def smooth_emission(params):
    C = params.emissions.weights
    d = params.emissions.bias
    posterior = test_model.smoother(params, emissions)
    return posterior.smoothed_means @ C.T + d

smoothed_emissions = vmap(smooth_emission)(param_samples)
smoothed_emissions_mean = smoothed_emissions.mean(axis=0)
smoothed_emissions_std = jnp.std(smoothed_emissions, axis=0)

print([emissions.shape, smoothed_emissions.shape, smoothed_emissions_mean.shape, smoothed_emissions_std.shape])
[(100, 10), (500, 100, 10), (100, 10), (100, 10)]
plot_results(emissions, smoothed_emissions_mean, smoothed_emissions_std)
../../_images/056916c469be027c04d1ed13d5d1846d2bf88c2bbdcc4d0c16e36ef57c5e855f.png

Use HMC to infer posterior over a subset of the parameters#

We freeze the transition parameters and initial parameters, so that only covariance matrices are learned. This is useful for structural time series models (see e.g., sts-jax library, which builds on dynamax.).

# Freeze transition parameters and initial parameters, so that only covariance matrices are learned

test_model = LinearGaussianSSM(state_dim, emission_dim)
test_params, test_param_props = test_model.initialize(next(keys),
                                                      dynamics_weights=true_params.dynamics.weights,
                                                      dynamics_bias=true_params.dynamics.bias,
                                                      emission_weights=true_params.emissions.weights,
                                                      emission_bias=true_params.emissions.bias)

# Set transition parameters and initial parameters to true values and mark as frozen
test_param_props.dynamics.weights.trainable = False
test_param_props.dynamics.bias.trainable = False
test_param_props.emissions.weights.trainable = False
test_param_props.emissions.bias.trainable = False
sample_size = 500
param_samples, lps = fit_hmc(test_model, test_params, test_param_props, next(keys), sample_size, emissions, num_integration_steps=30)

plt.plot(lps)
plt.xlabel("iteration")
plt.xlabel("log probability")
Running window adaptation
100.00% [100/100 00:00<?]

100.00% [500/500 00:35<00:00]
Text(0.5, 0, 'log probability')
../../_images/836fcbea0be86faac291446320ccaf16fa5a00ff92b89c418010cf123ef65efe.png
@jit
def smooth_emission(params):
    C = params.emissions.weights
    d = params.emissions.bias
    posterior = test_model.smoother(params, emissions)
    return posterior.smoothed_means @ C.T + d

smoothed_emissions = vmap(smooth_emission)(param_samples)
smoothed_emissions_mean = smoothed_emissions.mean(axis=0)
smoothed_emissions_std = jnp.std(smoothed_emissions, axis=0)

print([emissions.shape, smoothed_emissions.shape, smoothed_emissions_mean.shape, smoothed_emissions_std.shape])
[(100, 10), (500, 100, 10), (100, 10), (100, 10)]
plot_results(emissions, smoothed_emissions_mean, smoothed_emissions_std)
../../_images/7a3c19aa1c6d0cfb1d36849faecbddffe734e9c45a5fd4a1ca1f3d38a6f5f0b4.png