Source code for irlc.ex02.dp_model

# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np

[docs] class DPModel: r""" The Dynamical Programming model class The purpose of this class is to translate a dynamical programming problem, defined by the equations, .. math:: x_{k+1} & = f_k(x_k, u_k, w_k) \\ \text{cost} & = g_k(x_k, u_k, w_k) \\ \text{terminal cost} & = g_N(x_N) \\ \text{Noise disturbances:} \quad w_k & \sim P_W(w_k | x_k, u_k) \\ \text{State/action spaces:} \quad & \mathcal A_k(x_k), \mathcal S_k into a single python object which we can then use for planning. .. Note:: This is the first time many of you encounter a class. If so, you might wonder why you can't just implement the functions as usual, i.e. ``def f(x, k, ...):``, ``def g(x, k, ...):``, as regular python function and just let that be it? The reason is that we want to pass all these function (which taken together represents a planning problem) to planning methods such as the DP-algorithm (see the function :func:`~irlc.ex02.dp.DP_stochastic`) all at once. It is not very convenient to pass the functions one at a time -- instead we collect them into a class and simply call the function as >>> from irlc.ex02.inventory import InventoryDPModel >>> from irlc.ex02.dp import DP_stochastic >>> model = InventoryDPModel() # Intialize the model >>> J, pi = DP_stochastic(model) # All functions are passed to DP_stochastic To actually use the model, you need to extend it and implement the methods. The basic recipe for this is something like:: class MyDPModel(DPModel): def f(self, x, u, w, k): # Note the `self`-variable. You can use it to access class variables such as`self.N`. return x + u - w # Just an example def S(self, k): return [0, 1, 2] # State space S_k = {0, 1, 2} # Implement the other functions A, g, gN and Pw here. You should take a look at :func:`~irlc.ex02.inventory.InventoryDPModel` for a concrete example. Once the functions have been implemented, you can call them as: .. runblock:: pycon >>> from irlc.ex02.inventory import InventoryDPModel >>> model = InventoryDPModel(N=5) # Plan on a horizon of 5 >>> print("State space S_2", model.S(2)) >>> model.f(x=1, u=2, w=1, k=0) # Just an example. You don't have to use named arguments, although it helps on readability. >>> model.A(1, k=2) # Action space A_1(2), i.e. the actions available at time step k=1 in state 2. """
[docs] def __init__(self, N): """ Called when the DP Model is initialized. By default, it simply stores the planning horizon ``N`` :param N: The planning horizon in the DP problem :math:`N` """ self.N = N # Store the planning horizon.
[docs] def f(self, x, u, w, k: int): """ Implements the transition function :math:`x_{k+1} = f_k(x, u, w)` and returns the next state :math:`x_{k+1}` :param x: The state :math:`x_k` :param u: The action taken :math:`u_k` :param w: The random noise disturbance :math:`w_k` :param k: The current time step :math:`k` :return: The state the environment (deterministically) transitions to, i.e. :math:`x_{k+1}` """ raise NotImplementedError("Return f_k(x,u,w)")
[docs] def g(self, x, u, w, k: int) -> float: """ Implements the cost function :math:`c = g_k(x, u, w)` and returns the cost :math:`c` :param x: The state :math:`x_k` :param u: The action taken :math:`u_k` :param w: The random noise disturbance :math:`w_k` :param k: The current time step :math:`k` :return: The cost (as a ``float``) incurred by the environment, i.e. :math:`g_k(x, u, w)` """ raise NotImplementedError("Return g_k(x,u,w)")
[docs] def gN(self, x) -> float: """ Implements the terminal cost function :math:`c = g_N(x)` and returns the terminal cost :math:`c`. :param x: A state seen at the last time step :math:`x_N` :return: The terminal cost (as a ``float``) incurred by the environment, i.e. :math:`g_N(x)` """ raise NotImplementedError("Return g_N(x)")
[docs] def S(self, k: int): r""" Computes the state space :math:`\mathcal S_k` at time step :math:`k`. In other words, this function returns a set of all states the system can possibly be in at time step :math:`k`. .. Note:: I think the cleanest implementation is one where this function returns a python ``set``. However, it won't matter if the function returns a ``list`` or ``tuple`` instead. :param k: The current time step :math:`k` :return: The state space (as a ``list`` or ``set``) available at time step ``k``, i.e. :math:`\mathcal S_k` """ raise NotImplementedError("Return state space as set S_k = {x_1, x_2, ...}")
[docs] def A(self, x, k: int): r""" Computes the action space :math:`\mathcal A_k(x)` at time step :math:`k` in state `x`. In other words, this function returns a ``set`` of all actions the agent can take in time step :math:`k`. .. Note:: An example where the actions depend on the state is chess (in this case, the state is board position, and the actions are the legal moves) :param k: The current time step :math:`k` :param x: The state we want to compute the actions in :math:`x_k` :return: The action space (as a ``list`` or ``set``) available at time step ``k``, i.e. :math:`\mathcal A_k(x_k)` """ raise NotImplementedError("Return action space as set A(x_k) = {u_1, u_2, ...}")
[docs] def Pw(self, x, u, k: int): """ Returns the random noise disturbances and their probability. In other words, this function implements the distribution: .. math:: P_k(w_k | x_k, u_k) To implement this distribution, we must keep track of both the possible values of the noise disturbances :math:`w_k` as well as the (numerical) value of their probability :math:`p(w_k| ...)`. To do this, the function returns a dictionary of the form ``P = {w1: p_w1, w2: p_w2, ...}`` where - The keys ``w`` represents random noise disturbances - the values ``P[w]`` represents their probability (i.e. a ``float``) This can hopefully be made more clear with the Inventory environment: .. runblock:: pycon >>> from irlc.ex02.inventory import InventoryDPModel >>> model = InventoryDPModel(N=5) # Plan on a horizon of 5 >>> print("Random noise disturbances in state x=1 using action u=0 is:", model.Pw(x=1, u=0, k=0)) >>> for w, pw in model.Pw(x=1, u=0, k=0).items(): # Iterate and print: ... print(f"p_k({w}|x, u) =", pw) :param x: The state :math:`x_k` :param u: The action taken :math:`u_k` :param k: The current time step :math:`k` :return: A dictionary representing the distribution of random noise disturbances :math:`P_k(w |x_k, u_k)` of the form ``{..., w_i: pw_i, ...}`` such that ``pw_i = P_k(w_i | x, u)`` """ # Compute and return the random noise disturbances here. # As an example: return {'w_dummy': 1/3, 42: 2/3} # P(w_k="w_dummy") = 1/3, P(w_k =42)=2/3.
[docs] def w_rnd(self, x, u, k): r""" This helper function computes generates a random noise disturbance using the function :func:`irlc.ex02.dp_model.DPModel.Pw`, i.e. it returns a sample: .. math:: w \sim P_k(x_k, u_k) This will be useful for simulating the model. .. Note:: You don't have to implement or change this function. :param x: The state :math:`x_k` :param u: The action taken :math:`u_k` :param k: The current time step :math:`k` :return: A random noise disturbance :math:`w` distributed as :math:`P_k(x_k, u_k)` """ pW = self.Pw(x, u, k) w, pw = zip(*pW.items()) # seperate w and p(w) return np.random.choice(a=w, p=pw)