# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text."""References: [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online)."""frommathimportfloorfromgymnasium.spaces.boximportBoximportnumpyasnpfromirlc.ex09.rl_agentimport_masked_actionsfromirlc.utils.commonimportdefaultdict2
[docs]classFeatureEncoder:r""" The idea behind linear function approximation of :math:`Q`-values is that - We initialize (and eventually learn) a :math:`d`-dimensional weight vector :math:`w \in \mathbb{R}^d` - We assume there exists a function to compute a :math:`d`-dimensional feature vector :math:`x(s,a) \in \mathbb{R}^d` - The :math:`Q`-values are then represented as .. math:: Q(s,a) = x(s,a)^\top w Learning is therefore entirely about updating :math:`w`. The following example shows how you initialize the linear :math:`Q`-values and compute them in a given state: .. runblock:: pycon >>> import gymnasium as gym >>> from irlc.ex11.feature_encoder import LinearQEncoder >>> env = gym.make('MountainCar-v0') >>> Q = LinearQEncoder(env, tilings=8) >>> s, _ = env.reset() >>> a = env.action_space.sample() >>> Q(s,a) # Compute a Q-value. >>> Q.d # Get the number of dimensions >>> Q.x(s,a)[:4] # Get the first four coordinates of the x-vector >>> Q.w[:4] # Get the first four coordinates of the w-vector """
[docs]def__init__(self,env):""" Initialize the feature encoder. It requires an environment to know the number of actions and dimension of the state space. :param env: An openai Gym ``Env``. """self.env=envself.w=np.zeros((self.d,))self._known_masks={}defq_default(s):fromirlc.utils.commonimportDiscreteTextActionSpaceifsinself._known_masks:return{a:0forainrange(self.env.action_space.n)ifself._known_masks[s][(a-self.env.action_space.start)ifnotisinstance(self.env.action_space,DiscreteTextActionSpace)elsea]==1}else:return{a:0forainrange(self.env.action_space.n)}self.q_=defaultdict2(lambdas:q_default(s))
@propertydefd(self):""" Get the number of dimensions of :math:`w` .. runblock:: pycon >>> import gymnasium as gym >>> from irlc.ex11.feature_encoder import LinearQEncoder >>> env = gym.make('MountainCar-v0') >>> Q = LinearQEncoder(env, tilings=8) # Same encoding as Sutton & Barto >>> Q.d """raiseNotImplementedError()
[docs]defx(self,s,a):""" Computes the :math:`d`-dimensional feature vector :math:`x(s,a)` .. runblock:: pycon >>> import gymnasium as gym >>> from irlc.ex11.feature_encoder import LinearQEncoder >>> env = gym.make('MountainCar-v0') >>> Q = LinearQEncoder(env, tilings=8) # Same encoding as Sutton & Barto >>> s, info = env.reset() >>> x = Q.x(s, env.action_space.sample()) :param s: A state :math:`s` :param a: An action :math:`a` :return: Feature vector :math:`x(s,a)` """raiseNotImplementedError()
[docs]defget_Qs(self,state,info_s=None):""" This is a helper function, it is only for internal use. :param state: :param info_s: :return: """ifinfo_sisnotNoneand'mask'ininfo_sandnotisinstance(state,np.ndarray):ifstatenotinself._known_masks:self._known_masks[state]=info_s['mask']# Probably a good idea to check the Q-values are okay...avail_actions=_masked_actions(self.env.action_space,info_s['mask'])self.q_[state]={a:self.q_[state][a]forainavail_actions}# raise Exception()# from irlc.utils.common import ExplicitActionSpace## zip(*self.q_[state].items())fromirlc.pacman.pacman_environmentimportPacmanEnvironmentfromirlc.pacman.pacman_utilsimportActionsifisinstance(state,np.ndarray):actions=tuple(range(self.env.action_space.n))elifisinstance(self.env,PacmanEnvironment):# actions = Actions# actions = tuple(Actions._directions.keys())actions=_masked_actions(self.env.action_space,info_s['mask'])actions=tuple([self.env.action_space.actions[n]forninactions])else:actions=tuple(self.q_[state].keys())# if isinstance(self.env, PacmanEnvironment):# # TODO: Make smarter masking.# actions = [a for a in actions if a in self.env.A(state)]# actions =Qs=tuple([self(state,a)forainactions])# TODO: Implement masking and masking-cache.returnactions,Qs
## actions = list( self.env.P[state].keys() if hasattr(self.env, 'P') else range(self.env.action_space.n) )# Qs = [self(state, a) for a in actions]# return tuple(actions), tuple(Qs)
[docs]defget_optimal_action(self,state,info=None):r""" For a given state ``state``, this function returns the optimal action for that state. .. math:: a^* = \arg\max_a Q(s,a) An example: .. runblock:: pycon >>> from irlc.ex09.rl_agent import TabularAgent >>> class MyAgent(TabularAgent): ... def pi(self, s, k, info=None): ... a_star = self.Q.get_optimal_action(s, info) :param state: State to find the optimal action in :math:`s` :param info: The ``info``-dictionary corresponding to this state :return: The optimal action according to the Q-values :math:`a^*` """actions,Qa=self.get_Qs(state,info)iflen(actions)==0:print("Bad actions list")a_=np.argmax(np.asarray(Qa)+np.random.rand(len(Qa))*1e-8)returnactions[a_]
def__call__(self,s,a):""" Evaluate the Q-values for the given state and action. An example: .. runblock:: pycon >>> import gymnasium as gym >>> from irlc.ex11.feature_encoder import LinearQEncoder >>> env = gym.make('MountainCar-v0') >>> Q = LinearQEncoder(env, tilings=8) # Same encoding as Sutton & Barto >>> s, info = env.reset() >>> Q(s, env.action_space.sample()). # Compute Q(s,a) :param s: A state :math:`s` :param a: An action :math:`a` :return: Feature vector :math:`x(s,a)` """returnself.x(s,a)@self.wdef__getitem__(self,item):raiseException("Hi! You tried to access linear Q-values as Q[s,a]. You need to use Q(s,a). This choice signifies they are not represented as a table, but as a linear combination x(s,a)^T w")# s,a = item# return self.__call__(s, a)def__setitem__(self,key,value):raiseException("Oy! You tried to set a linearly encoded Q-value as in Q[s, a] = new_q_value.\n This is not possible since they are represented as x(s,a)^T w. Rewrite the expression to update Q.w.")
classDirectEncoder(FeatureEncoder):def__init__(self,env):self.d_=np.prod(env.observation_space.shape)*env.action_space.n# self.d_ = len(self.x(env.reset(), env.action_space.n))super().__init__(env)defx(self,s,a):xx=np.zeros((self.d,))n=s.sizexx[n*a:n*(a+1)]=sreturnxxospace=self.env.observation_space.shapesimple=Falseifnotisinstance(ospace,tuple):ospace=(ospace,)simple=Truesz=[]forj,discinenumerate(ospace):sz.append(disc.n)total_size=sum(sz)csum=np.cumsum(sz,)-sz[0]self.max_size=total_size*self.env.action_space.ndeffixed_sparse_representation(s,action):ifsimple:s=(s,)s_encoded=[cs+ds+total_size*actionfords,csinzip(s,csum)]returns_encodedself.get_active_tiles=fixed_sparse_representation# super().__init__(env)@propertydefd(self):returnself.d_return10000*8x=np.zeros(self.d)at=self.get_active_tiles(s,a)x[at]=1.0returnxclassGridworldXYEncoder(FeatureEncoder):def__init__(self,env):self.env=envself.na=self.env.action_space.nself.ns=2super().__init__(env)@propertydefd(self):returnself.na*self.nsdefx(self,s,a):x,y=sxx=[np.zeros(self.ns)for_inrange(self.na)]xx[a][0]=xxx[a][1]=y# return xx[a]xx=np.concatenate(xx)returnxxclassSimplePacmanExtractor(FeatureEncoder):def__init__(self,env):self.env=envfromirlc.pacman.feature_extractorimportSimpleExtractor# from reinforcement.featureExtractors import SimpleExtractorself._extractor=SimpleExtractor()self.fields=["bias","#-of-ghosts-1-step-away","#-of-ghosts-1-step-away","eats-food","closest-food"]super().__init__(env)defx(self,s,a):xx=np.zeros_like(self.w)# ap = self.env._actions_gym2pac[a]ap=afork,vinself._extractor.getFeatures(s,ap).items():xx[self.fields.index(k)]=vreturnxx@propertydefd(self):returnlen(self.fields)
[docs]def__init__(self,env,tilings=8,max_size=2048):r""" Implements the tile-encoder described by (SB18) :param env: The openai Gym environment we wish to solve. :param tilings: Number of tilings (translations). Typically 8. :param max_size: Maximum number of dimensions. """ifisinstance(env.observation_space,Box):os=env.observation_spacelow=os.lowhigh=os.highscale=tilings/(high-low)hash_table=IHT(max_size)self.max_size=max_sizedeftile_representation(s,action):s_=list((s*scale).flat)active_tiles=tiles(hash_table,tilings,s_,[action])# (s * scale).tolist()# if 0 not in active_tiles:# active_tiles.append(0)returnactive_tilesself.get_active_tiles=tile_representationelse:# raise Exception("Implement in new class")## Use Fixed Sparse Representation. See:# https://castlelab.princeton.edu/html/ORF544/Readings/Geramifard%20-%20Tutorial%20on%20linear%20function%20approximations%20for%20dynamic%20programming%20and%20RL.pdfospace=env.observation_spacesimple=Falseifnotisinstance(ospace,tuple):ospace=(ospace,)simple=Truesz=[]forj,discinenumerate(ospace):sz.append(disc.n)total_size=sum(sz)csum=np.cumsum(sz,)-sz[0]self.max_size=total_size*env.action_space.ndeffixed_sparse_representation(s,action):ifsimple:s=(s,)s_encoded=[cs+ds+total_size*actionfords,csinzip(s,csum)]returns_encodedself.get_active_tiles=fixed_sparse_representationsuper().__init__(env)
"""Following code contains the tile-coding utilities copied from:http://incompleteideas.net/tiles/tiles3.py-remove"""classIHT:"""Structure to handle collisions"""def__init__(self,size_val):self.size=size_valself.overfull_count=0self.dictionary={}defcount(self):returnlen(self.dictionary)deffull(self):returnlen(self.dictionary)>=self.sizedefget_index(self,obj,read_only=False):d=self.dictionaryifobjind:returnd[obj]elifread_only:returnNonesize=self.sizecount=self.count()ifcount>=size:ifself.overfull_count==0:print('IHT full, starting to allow collisions')self.overfull_count+=1returnhash(obj)%self.sizeelse:d[obj]=countreturncountdefhash_coords(coordinates,m,read_only=False):ifisinstance(m,IHT):returnm.get_index(tuple(coordinates),read_only)ifisinstance(m,int):returnhash(tuple(coordinates))%mifmisNone:returncoordinatesdeftiles(iht_or_size,num_tilings,floats,ints=None,read_only=False):"""returns num-tilings tile indices corresponding to the floats and ints"""ifintsisNone:ints=[]qfloats=[floor(f*num_tilings)forfinfloats]tiles=[]fortilinginrange(num_tilings):tilingX2=tiling*2coords=[tiling]b=tilingforqinqfloats:coords.append((q+b)//num_tilings)b+=tilingX2coords.extend(ints)tiles.append(hash_coords(coords,iht_or_size,read_only))returntiles