개요
Advantage Actor-Critic(A2C) 알고리즘을 PyTorch 코드로 구현해보았으며 Colab에서 코드 실행 및 결과 확인이 가능합니다.
설명
Advantage Actor-Critic 알고리즘은 Actor-Critic에 Advantage function 개념이 적용되어 보다 안정적인 학습이 가능한 알고리즘입니다. 
해당 계산을 위해 직접 action-value function(Q)을 학습시켜서 사용할수도 있지만, 여러 비효율적인 부분이 있어 state-value function(V)를 활용해 estimate하는 방법을 사용합니다.
이번 구현에서는 Generalized Advantage Estimation (GAE)를 사용해서 계산하였습니다.
간략한 연산 흐름은 다음과 같습니다.
1.self-play로 trajectory 모으기. (a,r,v,v_next)
2.advantage function 계산
3.value target 계산
4.loss계산 및 파라미터 업데이트
    def self_play_A2C(self, max_timestep=1000000):
        game_score = 0
        state = self.env.reset() # env 시작
        for _ in range(max_timestep):
            output = self.P(torch.from_numpy(state).float().to(device)) # inference
            inferenced_v = self.V(torch.from_numpy(state).float().to(device))
            prob_distribution = Categorical(output) #확률분포 표현
            action = prob_distribution.sample() #확률분포로부터 action 선택
            state, r, done, _ = self.env.step(action.item()) # env 진행  
            with torch.no_grad():
                inferenced_v_from_next_s = self.V(torch.from_numpy(state).float().to(device))
            if done==True:
                inferenced_v_from_next_s = 0
             # a,r,v,v_next 저장 
            self.trajectory.append((prob_distribution.log_prob(action), r, inferenced_v, inferenced_v_from_next_s))
            game_score += r 
            if done:
                break
        return game_score
    def update_weights_A2C(self):
        A = 0 #GAE
        lam = 0.9 #0.5 0.9 test         
        self.V_optimizer.zero_grad()
        self.P_optimizer.zero_grad()
        for gradient_policy_a_s, r, v, v_next in self.trajectory[::-1]: 
            with torch.no_grad():           
                delta = r + 0.99* v_next - v
                A += (0.99 * lam) * delta  
                V_target = A + v   
            V_loss = (V_target.item() -  v)**2          
            V_loss.backward()
            P_loss = -1 * A.item() * gradient_policy_a_s            
            P_loss.backward()                
        self.V_optimizer.step()   
        self.P_optimizer.step()     
        self.trajectory.clear() 
        return 
Gradient계산될 필요가 없는 advantage function 계산 부분은 with torch.no_grad():를 통해 처리하였습니다. 계산그래프상 PolicyNetwork랑 ValueNetwork의 충돌을 막기 위해서 .item() 이나 .detach()를 사용.
아래는 학습시 측정한 게임 스코어 그래프입니다. lambda값에 따라 collapse 일어나는 정도가 조금 다른 듯 합니다. Lunarlander도 lambda를 크게 하고 episode를 길게 하면 학습이 됩니다.
Full code
import gym
import torch
import torch.nn as nn
from torch.distributions import Categorical
import matplotlib.pyplot as plt
!pip install gym[classic_control]
#!pip install gym[box2d] #for lunarlander
!apt update
!apt install xvfb
!pip install pyvirtualdisplay
!pip install gym-notebook-wrapper
import gnwrapper
!nvidia-smi
print(torch.cuda.is_available())
class Agent(nn.Module):
    def __init__(self, input_dim, output_dim, width):
        super().__init__()
        self.P = PolicyNetwork(input_dim, output_dim, width)
        self.P.to(device)
        self.P.train()  
        self.P_optimizer = torch.optim.Adam(self.P.parameters(), lr=0.0003)
        self.V = ValueNetwork(input_dim, output_dim, width)
        self.V.to(device)        
        self.V.train()
        self.V_optimizer = torch.optim.Adam(self.V.parameters(), lr=0.0003)   ## ?? 0,01
        self.trajectory = []       
        self.env = gym.make(game_name)        
    def self_play_A2C(self, max_timestep=1000000):
        game_score = 0
        state = self.env.reset() # env 시작
        for _ in range(max_timestep):
            output = self.P(torch.from_numpy(state).float().to(device)) # inference
            inferenced_v = self.V(torch.from_numpy(state).float().to(device))
            prob_distribution = Categorical(output) #확률분포 표현
            action = prob_distribution.sample() #확률분포로부터 action 선택
            state, r, done, _ = self.env.step(action.item()) # env 진행  
            with torch.no_grad():
                inferenced_v_from_next_s = self.V(torch.from_numpy(state).float().to(device))
            if done==True:
                inferenced_v_from_next_s = 0
             # a,r,v,v_next 저장 
            self.trajectory.append((prob_distribution.log_prob(action), r, inferenced_v, inferenced_v_from_next_s))
            game_score += r 
            if done:
                break
        return game_score
    def update_weights_A2C(self):
        A = 0 #GAE
        lam = 0.5 #0.1 0.9 test         
        self.V_optimizer.zero_grad()
        self.P_optimizer.zero_grad()
        for gradient_policy_a_s, r, v, v_next in self.trajectory[::-1]: 
            with torch.no_grad():           
                delta = r + 0.99* v_next - v
                A += (0.99 * lam) * delta  
                V_target = A + v   
            V_loss = (V_target.item() -  v)**2          
            V_loss.backward()            
            P_loss = -1 * A.item() * gradient_policy_a_s            
            P_loss.backward()             
        self.V_optimizer.step()   
        self.P_optimizer.step()             
        self.trajectory.clear() 
        return 
class PolicyNetwork(nn.Module):  
    def __init__(self, input_dim, output_dim, width):
        super().__init__()
        self.layer1 = torch.nn.Linear(input_dim, width)
        self.layer2 = torch.nn.Linear(width, width) 
        self.layer3 = torch.nn.Linear(width, output_dim) 
    def forward(self, x):
        x = self.layer1(x)
        x = torch.nn.functional.relu(x)
        x = self.layer2(x)
        x = torch.nn.functional.relu(x)
        x = self.layer3(x)
        x = torch.nn.functional.softmax(x, dim=0)
        return x
class ValueNetwork(nn.Module): 
    def __init__(self, input_dim, output_dim, width):
        super().__init__()
        self.layer1 = torch.nn.Linear(input_dim, width)
        self.layer2 = torch.nn.Linear(width, width) 
        self.layer3 = torch.nn.Linear(width, 1) 
    def forward(self, x):
        x = self.layer1(x)
        x = torch.nn.functional.relu(x)
        x = self.layer2(x)
        x = torch.nn.functional.relu(x)
        x = self.layer3(x)
        return x
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
score_arr = []
game_name = 'CartPole-v1' #'LunarLander-v2'
env = gym.make(game_name) 
agent = Agent(env.observation_space.shape[0], env.action_space.n, 128) 
print(agent)
env.close()
#Self play 및 weight update
episode_nums = 500 #LunarLander-v2는 더 길게
for i in range(episode_nums):    
    game_score = agent.self_play_A2C()
    agent.update_weights_A2C()    
    score_arr.append(game_score)  
    if i%50==0 : print('episode', i)    
torch.save(agent.state_dict(), 'weights.pt') 
agent.env.close()
#Episode별 얻은 score
plt.plot(score_arr, label ='score')
plt.legend(loc='upper left')
#학습된 모델로 게임 play한 영상
agent.load_state_dict(torch.load("weights.pt"))
env = gnwrapper.LoopAnimation(gym.make(game_name)) 
state = env.reset()
for _ in range(200):
    with torch.no_grad():
        output = agent.P(torch.from_numpy(state).float().to(device)) # inference
        prob_distribution = Categorical(output) #확률분포 표현
        action = prob_distribution.sample() #확률분포로부터 action 선택
    env.render()
    state, rew, done, _ = env.step(action.item())
    if done:
        state = env.reset()
env.display()
댓글
댓글 쓰기