Exercise 2: Dynamical Programming#

Note

  • The exercises material is divided into general information (found on this page) and the actual exercise instructions. You can download this weeks exercise instructions from here:

  • You are encouraged to prepare the homework problems 1, 2 (indicated by a hand in the PDF file) at home and present your solution during the exercise session.

  • To get the newest version of the course material, please see Making sure your files are up to date

This week will be concerned with dynamical programming. Dynamical programming is a technique that uses a model of the environment to create an optimal plan. This means there are now 3 classes to content with:

The environment

Represented by an OpenAI gymnasium Env class. This class represents the problem we are trying to solve and has e.g. a step()-method

The agent

Represented as a Agent class. This class represents the agent which which acts in the environment. In our case, the agent should plan optimally using dynamical programming and the policy method pi(), should simply execute each step of that plan

The model

This class (DPModel) represents the dynamical programming model the agent use to plan. It must implement python functions equivalent to \(f_k(x_k, u_k, w_k)\), \(g_k(x_k, u_k, w_k)\) and so on. The agent should use the model clas to plan, and not the environment.

The dynamical programming model#

Using a class to represent a model can be confusing, but since we will use the same trick in control it is worth considering why and how we choose to represent the model this way.

The model is basically just a collection of functions. For instance, consider these functions corresponding to \(f_k\) and \(g_k\):

>>> def f(x, u, w, k):
...     return x+u-w #Compute and return x_{k+1} here
... 
>>> def g(x, u, w, k):
...     return x**2 + u**2 #Compute and return g_k(x, u, w) here
... 

A model is then just a class that collects the function as so:

>>> class MyModel:
...     def f(self, x, u, w, k):
...         return x+u-w #Compute and return x_{k+1} here
...     def g(self, x, u, w, k):
...         return x**2 + u**2 #Compute and return g_k(x, u, w) here
... 
>>> model = MyModel()
>>> print(model.g(2, 1, 0, 0))
5

Note

signature means the name and order of variables

What the class DPModel (see below) does is just to specify the signature of the functions f, g, etc. shown above. This helps us to specify the model in the same way every time, and is a very common technique in programming. In other words, the class DPModel itself is very simple, and looks something like this:

>>> class SimplifiedDPModel:
...     def f(self, x, u, w, k):
...         raise NotImplementedError("Implement f_k here")
...     def g(self, x, u, w, k):
...         raise NotImplementedError("Implement f_k here")
... 

And when we actually want to implement a specific model, for instance of the inventory environment, we can do that as follows (see (Herlau [Her24], Subsection 5.1.2)):

# inventory.py
class InventoryDPModel(DPModel): 
    def __init__(self, N=3):
        super().__init__(N=N)

    def A(self, x, k): # Action space A_k(x)
        return {0, 1, 2}

    def S(self, k): # State space S_k
        return {0, 1, 2}

    def g(self, x, u, w, k): # Cost function g_k(x,u,w)
        return u + (x + u - w) ** 2

    def f(self, x, u, w, k): # Dynamics f_k(x,u,w)
        return max(0, min(2, x + u - w ))

    def Pw(self, x, u, k): # Distribution over random disturbances 
        return {0:.1, 1:.7, 2:0.2}

    def gN(self, x):
        return 0 

And the model can be used as:

>>> from irlc.ex02.inventory import InventoryDPModel
>>> model = InventoryDPModel(N=5) # Plan over a horizon of N=5.
>>> model.g(x=1, u=1, w=0, k=0) # Computes g_0(x=1, u=1, w=0)
5
>>> print("State space at time k=0", model.S(0))
State space at time k=0 {0, 1, 2}

This means that when you implement the DP algorithm, you only need to refer to functions such as model.S, model.f, and so on without thinking about any specific environment – if you use the functions correctly, your implementation will work with any specific model that matches the signature in the DPModel class!.

The following example shows the start of the DP algorithm implementation. It represents the cost functions \(J_k`\) as a list of dictionaries J, so that J[N][x] corresponds to the terminal cost \(J_N(x)\).

>>> def very_simplified_dp(model):
...     N = model.N                                  # Planning horizon
...     print("The planning horizon is", N)
...     print("The state space is", model.S(0))
...     J = [dict() for k in range(N+1)]             # Initialize J_k = [J_0, J_1, ..., J_N]
...     J[N] = {x: model.gN(x) for x in model.S(N) } # Initialize J_N by looping over terminal states model.S(N)
...     return J                                     # Insert the rest of the DP algorithm here.
... 
>>> from irlc.ex02.inventory import InventoryDPModel
>>> model = InventoryDPModel(N=4)
>>> J = very_simplified_dp(model)                   # Call the DP algorithm and get J back.
The planning horizon is 4
The state space is {0, 1, 2}
>>> print(J)
[{}, {}, {}, {}, {0: 0, 1: 0, 2: 0}]

A primer on Pacman#

In the first project, you will work with the Pacman game. I have included a more complete description of the Pacman game at Pacman.

Classes and functions#

This section provides an overview of the most important classes and functions:

class irlc.ex02.dp_model.DPModel(N)[source]#

The Dynamical Programming model class

The purpose of this class is to translate a dynamical programming problem, defined by the equations,

\[\begin{split}x_{k+1} & = f_k(x_k, u_k, w_k) \\ \text{cost} & = g_k(x_k, u_k, w_k) \\ \text{terminal cost} & = g_N(x_N) \\ \text{Noise disturbances:} \quad w_k & \sim P_W(w_k | x_k, u_k) \\ \text{State/action spaces:} \quad & \mathcal A_k(x_k), \mathcal S_k\end{split}\]

into a single python object which we can then use for planning.

Note

This is the first time many of you encounter a class. If so, you might wonder why you can’t just implement the functions as usual, i.e. def f(x, k, ...):, def g(x, k, ...):, as regular python function and just let that be it?

The reason is that we want to pass all these function (which taken together represents a planning problem) to planning methods such as the DP-algorithm (see the function DP_stochastic()) all at once. It is not very convenient to pass the functions one at a time – instead we collect them into a class and simply call the function as

>>> from irlc.ex02.inventory import InventoryDPModel
>>> from irlc.ex02.dp import DP_stochastic
>>> model = InventoryDPModel()      # Intialize the model
>>> J, pi = DP_stochastic(model)    # All functions are passed to DP_stochastic

To actually use the model, you need to extend it and implement the methods. The basic recipe for this is something like:

class MyDPModel(DPModel):
    def f(self, x, u, w, k): # Note the `self`-variable. You can use it to access class variables such as`self.N`.
        return x + u - w     # Just an example
    def S(self, k):
        return [0, 1, 2]    # State space S_k = {0, 1, 2}
        # Implement the other functions A, g, gN and Pw here.

You should take a look at InventoryDPModel() for a concrete example. Once the functions have been implemented, you can call them as:

>>> from irlc.ex02.inventory import InventoryDPModel
>>> model = InventoryDPModel(N=5) # Plan on a horizon of 5
>>> print("State space S_2", model.S(2))
State space S_2 {0, 1, 2}
>>> model.f(x=1, u=2, w=1, k=0)   # Just an example. You don't have to use named arguments, although it helps on readability.
2
>>> model.A(1, k=2) # Action space A_1(2), i.e. the actions available at time step k=1 in state 2.
{0, 1, 2}
__init__(N)[source]#

Called when the DP Model is initialized. By default, it simply stores the planning horizon N

Parameters:

N – The planning horizon in the DP problem \(N\)

f(x, u, w, k)[source]#

Implements the transition function \(x_{k+1} = f_k(x, u, w)\) and returns the next state \(x_{k+1}\)

Parameters:
  • x – The state \(x_k\)

  • u – The action taken \(u_k\)

  • w – The random noise disturbance \(w_k\)

  • k (int) – The current time step \(k\)

Returns:

The state the environment (deterministically) transitions to, i.e. \(x_{k+1}\)

g(x, u, w, k)[source]#

Implements the cost function \(c = g_k(x, u, w)\) and returns the cost \(c\)

Parameters:
  • x – The state \(x_k\)

  • u – The action taken \(u_k\)

  • w – The random noise disturbance \(w_k\)

  • k (int) – The current time step \(k\)

Return type:

float

Returns:

The cost (as a float) incurred by the environment, i.e. \(g_k(x, u, w)\)

gN(x)[source]#

Implements the terminal cost function \(c = g_N(x)\) and returns the terminal cost \(c\).

Parameters:

x – A state seen at the last time step \(x_N\)

Return type:

float

Returns:

The terminal cost (as a float) incurred by the environment, i.e. \(g_N(x)\)

S(k)[source]#

Computes the state space \(\mathcal S_k\) at time step \(k\). In other words, this function returns a set of all states the system can possibly be in at time step \(k\).

Note

I think the cleanest implementation is one where this function returns a python set. However, it won’t matter if the function returns a list or tuple instead.

Parameters:

k (int) – The current time step \(k\)

Returns:

The state space (as a list or set) available at time step k, i.e. \(\mathcal S_k\)

A(x, k)[source]#

Computes the action space \(\mathcal A_k(x)\) at time step \(k\) in state x.

In other words, this function returns a set of all actions the agent can take in time step \(k\).

Note

An example where the actions depend on the state is chess (in this case, the state is board position, and the actions are the legal moves)

Parameters:
  • k (int) – The current time step \(k\)

  • x – The state we want to compute the actions in \(x_k\)

Returns:

The action space (as a list or set) available at time step k, i.e. \(\mathcal A_k(x_k)\)

Pw(x, u, k)[source]#

Returns the random noise disturbances and their probability. In other words, this function implements the distribution:

\[P_k(w_k | x_k, u_k)\]

To implement this distribution, we must keep track of both the possible values of the noise disturbances \(w_k\) as well as the (numerical) value of their probability \(p(w_k| ...)\).

To do this, the function returns a dictionary of the form P = {w1: p_w1, w2: p_w2, ...} where

  • The keys w represents random noise disturbances

  • the values P[w] represents their probability (i.e. a float)

This can hopefully be made more clear with the Inventory environment:

>>> from irlc.ex02.inventory import InventoryDPModel
>>> model = InventoryDPModel(N=5) # Plan on a horizon of 5
>>> print("Random noise disturbances in state x=1 using action u=0 is:", model.Pw(x=1, u=0, k=0))
Random noise disturbances in state x=1 using action u=0 is: {0: 0.1, 1: 0.7, 2: 0.2}
>>> for w, pw in model.Pw(x=1, u=0, k=0).items(): # Iterate and print:
...     print(f"p_k({w}|x, u) =", pw)
... 
p_k(0|x, u) = 0.1
p_k(1|x, u) = 0.7
p_k(2|x, u) = 0.2
Parameters:
  • x – The state \(x_k\)

  • u – The action taken \(u_k\)

  • k (int) – The current time step \(k\)

Returns:

A dictionary representing the distribution of random noise disturbances \(P_k(w |x_k, u_k)\) of the form {..., w_i: pw_i, ...} such that pw_i = P_k(w_i | x, u)

w_rnd(x, u, k)[source]#

This helper function computes generates a random noise disturbance using the function irlc.ex02.dp_model.DPModel.Pw(), i.e. it returns a sample:

\[w \sim P_k(x_k, u_k)\]

This will be useful for simulating the model.

Note

You don’t have to implement or change this function.

Parameters:
  • x – The state \(x_k\)

  • u – The action taken \(u_k\)

  • k – The current time step \(k\)

Returns:

A random noise disturbance \(w\) distributed as \(P_k(x_k, u_k)\)

irlc.ex02.dp.DP_stochastic(model)[source]#

Implement the stochastic DP algorithm. The implementation follows (Her24, Algorithm 1). Once you are done, you should be able to call the function as:

>>> from irlc.ex02.graph_traversal import SmallGraphDP
>>> from irlc.ex02.dp import DP_stochastic
>>> model = SmallGraphDP(t=5)  # Instantiate the small graph with target node 5
>>> J, pi = DP_stochastic(model)
>>> print(pi[0][2]) # Action taken in state ``x=2`` at time step ``k=0``.
3
Parameters:

model (DPModel) – An instance of irlc.ex02.dp_model.DPModel class. This represents the problem we wish to solve.

Returns:

  • J - A list of of cost function so that J[k][x] represents \(J_k(x)\)

  • pi - A list of dictionaries so that pi[k][x] represents \(\mu_k(x)\)

Solutions to selected exercises#