# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.fromgymnasiumimportloggerfromirlc.ex01.agentimportAgentimporttimeimportsysimportgymnasiumasgymimportosimportasyncioimportnumpyasnpHUMAN_REQUEST_RESET='reset the environment'try:# Using this backend apparently clash with scientific mode. Not sure why it was there in the first place so# disabling it for now.# matplotlib.use('TkAgg')importmatplotlib.pyplotaspltexceptImportErrorase:logger.warn('failed to set matplotlib backend, plotting will not work: %s'%str(e))plt=Nonetry:importpygamefrompygame.eventimportEventexceptImportErrorase:logger.warn('failed to import pygame. Many of the interactive visualizations will not work: %s'%str(e))Event=NoneclassAgentWrapper(Agent):"""Wraps the environment to allow a modular transformation. This class is the base class for all wrappers. The subclass could override some methods to change the behavior of the original environment without touching the original code. .. note:: Don't forget to call ``super().__init__(env)`` if the subclass overrides :meth:`__init__`. """def__init__(self,agent,env):self.agent=agentself.env=envdef__getattr__(self,name):ifname.startswith('_'):raiseAttributeError("attempted to get missing private attribute '{}'".format(name))returngetattr(self.agent,name)@classmethoddefclass_name(cls):returncls.__name__defpi(self,state,k,info=None):returnself.agent.pi(state,k,info)# return self.env.step(action)deftrain(self,*args,**kwargs):returnself.agent.train(*args,**kwargs)def__str__(self):return'<{}{}>'.format(type(self).__name__,self.agent)def__repr__(self):returnstr(self)@propertydefunwrapped(self):returnself.agent.unwrappedPAUSE_KEY=ord('p')SPACEBAR="_SPACE_BAR_PRESSED_"HUMAN_DEMAND_RESET="reset env."asyncdef_webassembly_interactive(env,agent,autoplay=False):frompygameimportgfxdrawimportnumpyasnpdefaapolygon(surface,points,color):pygame.draw.polygon(surface,color,points)deffilled_polygon(surface,points,color):pygame.draw.polygon(surface,color,points,width=0)defaacircle(surface,x,y,r,color):pygame.draw.circle(surface,color,(x,y),r,width=1)deffilled_circle(surface,x,y,r,color):pygame.draw.circle(surface,color,(x,y),r,width=0)gfxdraw.aapolygon=aapolygongfxdraw.filled_polygon=filled_polygongfxdraw.aacircle=aacirclegfxdraw.filled_circle=filled_circle# from irlc.utils.player_wrapper import AsyncPlayerWrapperPygameagent=AsyncPlayerWrapperPygame(agent,env,autoplay=autoplay)s,info=env.reset()k=0awaitasyncio.sleep(0.2)# set up graphics.running=Truestep=0whilerunning:pi_action=agent.pi(s,k,info)a=NonewhileTrue:foreventinpygame.event.get():# if event.type == pygame.KEYDOWN and event.key == pygame.K_r:# print("Resetting the environment")# a = HUMAN_DEMAND_RESET# breaka=agent._resolve_event_into_action(event=event,pi_action=pi_action,info=info)ifaisnotNone:breakifagent.human_demand_autoplay:a=pi_actionbreakifaisnotNone:breakawaitasyncio.sleep(0.05)# if not isinstance(a, np.ndarray) and isinstance(a, str) and a == HUMAN_DEMAND_RESET:# s, info = env.reset()# continuesp,reward,done,truncated,info_sp=(awaitenv.async_step(a))ifhasattr(env,'async_step')elseenv.step(a)agent.train(s,a,reward,sp,done=done,info_s=info,info_sp=info_sp)step=step+1k=k+1ifdoneortruncated:sp,info_sp=env.reset()k=0s=spinfo=info_spawaitasyncio.sleep(0.01)# release loop.classPlayWrapperPygame(AgentWrapper):render_after_train=TrueACTION_FORCE_RESET="force reset of environment."def__init__(self,agent:Agent,env:gym.Env,keys_to_action=None,autoplay=False):super().__init__(agent,env)ifkeys_to_actionisNone:ifhasattr(env,'get_keys_to_action'):keys_to_action=env.get_keys_to_action()elifhasattr(env,"env")andhasattr(env.env,'get_keys_to_action'):keys_to_action=env.env.get_keys_to_action()elifhasattr(env.unwrapped,'get_keys_to_action'):keys_to_action=env.unwrapped.get_keys_to_action()else:keys_to_action=dict()# print(env.spec.id +" does not have explicit key to action mapping, please specify one manually")# assert False, env.spec.id + " does not have explicit key to action mapping, " + \# "please specify one manually"# keys_to_action = dict()self.keys_to_action=keys_to_actionself.env=envself.human_wants_restart=Falseself.human_sets_pause=Falseself.human_agent_action=-1self.human_demand_autoplay=autoplay# Now for the more fucky stuff. Collect internal statistics and such.self.env._interactive_data=dict(trajectories=[],in_terminal_state=False)# Now fix the train functiontrain2=agent.trainreset2=env.resetdefreset_(**kwargs):fromirlc.ex01.agentimportTrajectorys,info=reset2(**kwargs)env._interactive_data['trajectories'].append(Trajectory(state=[s],time=[0],action=[],reward=[],env_info=[info]))env._interactive_data['in_terminal_state']=Falsereturns,infodeftrain_(s,a,r,sp,done,info_s,info_sp):ifnotisinstance(a,str)ora!=PlayWrapperPygame.ACTION_FORCE_RESET:train2(s,a,r,sp,done,info_s,info_sp)env._interactive_data['trajectories'][-1].state.append(sp)env._interactive_data['trajectories'][-1].reward.append(r)env._interactive_data['trajectories'][-1].action.append(a)env._interactive_data['trajectories'][-1].env_info.append(info_sp)ifdone:env._interactive_data['in_terminal_state']=Trueenv._interactive_data['avg_reward_per_episode']=np.mean([sum(t.reward)fortinenv._interactive_data['trajectories']])env._interactive_data['completed_episodes']=len(env._interactive_data['trajectories'])ifself.render_after_train:# always true.env.render()step2=env.stepdefstep_(a):ifisinstance(a,str)anda==PlayWrapperPygame.ACTION_FORCE_RESET:print("Resetting interaction data.")self.env._interactive_data=dict(trajectories=[],in_terminal_state=False)return"done",3,True,True,{}else:sp,r,terminated,truncated,info=step2(a)returnsp,r,terminated,truncated,infoagent.train=train_env.agent=agentenv.unwrapped.agent=agentenv.reset=reset_env.step=step_def_resolve_event_into_action(self,event:pygame.event.Event,pi_action,info:dict):# assert False # Unclear if used.ifevent.type==pygame.QUIT:ifhasattr(self,'env'):self.env.close()time.sleep(0.1)pygame.display.quit()time.sleep(0.1)pygame.quit()time.sleep(0.1)sys.exit()# checking if keydown event happened or notifevent.type==pygame.KEYDOWN:# if event.key == pygame.K_f:# print("Pressing f")# self.env.render()# return Noneifevent.key==pygame.K_SPACE:a=pi_actionreturnaelif(event.key,)inself.keys_to_action:a=self.keys_to_action[(event.key,)]ifinfoisnotNoneand'mask'ininfo:fromirlc.utils.commonimportDiscreteTextActionSpaceifisinstance(self.env.action_space,DiscreteTextActionSpace):aint=self.env.action_space.actions.index(a)else:aint=aifinfo['mask'][aint]==0:# The action was masked. This means that this action is unavailable, and we should select another.# The default is to select one of the available actions from the mask.fromirlc.pacman.gamestateimportGameStatefromirlc.pacman.pacman_environmentimportPacmanEnvironmentifisinstance(self.env,PacmanEnvironment):a="Stop"else:a=info['mask'].argmax()ifisinstance(self.env.action_space,DiscreteTextActionSpace):a=self.env.action_space.actions[a]returnaelse:returnaelifevent.key==pygame.K_r:print("Pressing r")ifhasattr(self,'reset'):self.reset()returnPlayWrapperPygame.ACTION_FORCE_RESETelifevent.key==pygame.K_f:print("Pressing f")self.env.render()elifevent.key==pygame.K_p:# unpause# print("Unpausing game")self.human_demand_autoplay=notself.human_demand_autoplay# print(f"Unpausing game {self.human_demand_autoplay=}")ifself.human_demand_autoplay:# print("Returning", pi_action)a=pi_actionreturnaelse:# try to pass event on to the game.ifhasattr(self.env,'keypress'):self.env.keypress(event)returnNonedefpi(self,state,k,info=None):# print("Entering pi")pi_action=super().pi(state,k,info)# make sure super class pi method is called in case it has side effects.a=NonewhileTrue:# print(f"while loop {self.human_demand_autoplay=}")foreventinpygame.event.get():# print("resolving", event)a=self._resolve_event_into_action(event,pi_action,info)# print("Resolved", a)ifaisnotNone:# print("Breaking", a)break# if False:# if event.type == pygame.QUIT:# if hasattr(self, 'env'):# self.env.close()# time.sleep(0.1)# pygame.display.quit()# time.sleep(0.1)# pygame.quit()# time.sleep(0.1)# sys.exit()## # checking if keydown event happened or not# if event.type == pygame.KEYDOWN:# if event.key == pygame.K_SPACE:# a = pi_action# break# elif (event.key,) in self.keys_to_action:# a = self.keys_to_action[(event.key,)]# if info is not None and 'mask' in info:# from irlc.utils.common import DiscreteTextActionSpace# if isinstance(self.env.action_space, DiscreteTextActionSpace):# aint = self.env.action_space.actions.index(a)# else:# aint = a## if info['mask'][aint] == 0:# # The action was masked. This means that this action is unavailable, and we should select another.# # The default is to select one of the available actions from the mask.# a = info['mask'].argmax()# if isinstance(self.env.action_space, DiscreteTextActionSpace):# a = self.env.action_space.actions[a]# break# else:# break# elif event.key == pygame.K_r:# print("Pressing r")# if hasattr(self, 'reset'):# return PlayWrapperPygame.ACTION_FORCE_RESET# ## # self.reset()# # self.env.reset()# ## # self.env.render()# elif event.key == pygame.K_f:# print("Pressing f")# self.env.render()## elif event.unicode == 'p':# # unpause# self.human_demand_autoplay = not self.human_demand_autoplay# break# else:# # try to pass event on to the game.# if hasattr(self.env, 'keypress'):# self.env.keypress(event)ifself.human_demand_autoplay:a=pi_actionifaisnotNone:try:fromirlc.pacman.gamestateimportGameStateifisinstance(state,GameState):ifanotinstate.A():a="Stop"exceptExceptionase:pass# print(f"{a=}")returnatime.sleep(0.1)classAsyncPlayerWrapperPygame(PlayWrapperPygame):# render_after_train = Falsedefpi(self,state,k,info=None):pi_action=self.agent.pi(state,k,info)# make sure super class pi method is called in case it has side effects.returnpi_actiondef_post_process_action_for_visualization(self):pass
[docs]definteractive(env:gym.Env,agent:Agent,autoplay=False)->(gym.Env,Agent):""" This function is used for visualizations. It can - Allow you to input keyboard commands to an environment - Allow you to save results - Visualize reinforcement-learning agents in the gridworld environment. by adding a single extra line ``env, agent = interactive(env,agent)``. The following shows an example: >>> from irlc.gridworld.gridworld_environments import BookGridEnvironment >>> from irlc import train, Agent, interactive >>> env = BookGridEnvironment(render_mode="human", zoom=0.8) # Pass render_mode='human' for visualization. >>> env, agent = interactive(env, Agent(env)) # Make the environment interactive. Note that it needs an agent. >>> train(env, agent, num_episodes=2) # You can train and use the agent and environment as usual. >>> env.close() It also enables you to visualize the environment at a matplotlib figure or save it as a pdf file using ``env.plot()`` and ``env.savepdf('my_file.pdf)``. All demos and figures in the notes are made using this function. :param env: A gym environment (an instance of the ``Env`` class) :param agent: An agent (an instance of the ``Agent`` class) :param autoplay: Whether the simulation should be unpaused automatically :return: An environment and agent which have been slightly updated to make them interact with each other. You can use them as usual with the ``train``-function. """fromPILimportImage# Let's put this one here in case we run the code in headless mode.agent=PlayWrapperPygame(agent,env,autoplay=autoplay)passdefplot():env.render_mode,rmt='rgb_array',env.render_modeframe=env.render()env.render_mode=rmtim=Image.fromarray(frame)plt.imshow(im)plt.axis('off')plt.axis('off')plt.tight_layout()defsavepdf(file):env.render_mode,rmt='rgb_array',env.render_modeframe=env.render()env.render_mode=rmtim=Image.fromarray(frame)snapshot_base=fileifsnapshot_base.endswith(".png"):sf=snapshot_base[:-4]fext='png'else:fext='pdf'ifsnapshot_base.endswith(".pdf"):sf=snapshot_base[:-4]else:sf=snapshot_basesf=f"{sf}.{fext}"dn=os.path.dirname(sf)iflen(dn)>0andnotos.path.isdir(dn):os.makedirs(dn)print("Saving snapshot of environment to",os.path.abspath(sf))iffext=='png':im.save(sf)fromirlcimport_move_to_output_directory_move_to_output_directory(sf)else:plt.figure(figsize=(16,16))plt.imshow(im)plt.axis('off')plt.tight_layout()fromirlcimportsavepdfsavepdf(sf,verbose=True)plt.show()env.plot=plotenv.savepdf=savepdfreturnenv,agent
defmain():fromirlc.ex11.q_agentimportQAgentfromirlc.gridworld.gridworld_environmentsimportBookGridEnvironmentfromirlcimporttrain,Agentenv=BookGridEnvironment(render_mode="human",zoom=0.8)# Pass render_mode='human' for visualization.env,agent=interactive(env,Agent(env))# Make thenv.reset()# We always need to call resetenv.plot()# Plot the environment.env.close()# Interaction with a random agent.fromirlc.gridworld.gridworld_environmentsimportBookGridEnvironmentfromirlcimporttrain,Agentenv=BookGridEnvironment(render_mode="human",zoom=0.8)# Pass render_mode='human' for visualization.env,agent=interactive(env,Agent(env))# Make the environment interactive. Note that it needs an agent.train(env,agent,num_episodes=100)# You can train and use the agent and environment as usual. env.close()if__name__=="__main__":main()