Advantage Actor-Critic (A2C) implementation

개요

Advantage Actor-Critic(A2C) 알고리즘을 PyTorch 코드로 구현해보았으며 Colab에서 코드 실행 및 결과 확인이 가능합니다.

설명

Advantage Actor-Critic 알고리즘은 Actor-Critic에 Advantage function 개념이 적용되어 보다 안정적인 학습이 가능한 알고리즘입니다. 
Advantage function A는 지금 state에서 어떤 액션을 취해 얻을수 있는 가치 대비 다른 액션들로부터 얻을수 있는 가치의 차이를 학습에 반영하기 위한 것입니다. 

해당 계산을 위해 직접 action-value function(Q)을 학습시켜서 사용할수도 있지만, 여러 비효율적인 부분이 있어 state-value function(V)를 활용해 estimate하는 방법을 사용합니다.

이번 구현에서는 Generalized Advantage Estimation (GAE)를 사용해서 계산하였습니다.
간략한 연산 흐름은 다음과 같습니다.
1.self-play로 trajectory 모으기. (a,r,v,v_next)
2.advantage function 계산
3.value target 계산
4.loss계산 및 파라미터 업데이트
    def self_play_A2C(selfmax_timestep=1000000):
        game_score = 0
        state = self.env.reset() # env 시작
        for _ in range(max_timestep):
            output = self.P(torch.from_numpy(state).float().to(device)) # inference
            inferenced_v = self.V(torch.from_numpy(state).float().to(device))
            prob_distribution = Categorical(output) #확률분포 표현
            action = prob_distribution.sample() #확률분포로부터 action 선택
            state, r, done, _ = self.env.step(action.item()) # env 진행  
            with torch.no_grad():
                inferenced_v_from_next_s = self.V(torch.from_numpy(state).float().to(device))
            if done==True:
                inferenced_v_from_next_s = 0
             # a,r,v,v_next 저장 
            self.trajectory.append((prob_distribution.log_prob(action), r, inferenced_v, inferenced_v_from_next_s))
            game_score += r 
            if done:
                break
        return game_score

    def update_weights_A2C(self):
        A = 0 #GAE
        lam = 0.9 #0.5 0.9 test         
        self.V_optimizer.zero_grad()
        self.P_optimizer.zero_grad()
        for gradient_policy_a_s, r, v, v_next in self.trajectory[::-1]: 
            with torch.no_grad():           
                delta = r + 0.99* v_next - v
                A += (0.99 * lam) * delta  
                V_target = A + v   

            V_loss = (V_target.item() -  v)**2          
            V_loss.backward()
            
            P_loss = -1 * A.item() * gradient_policy_a_s            
            P_loss.backward()                

        self.V_optimizer.step()   
        self.P_optimizer.step()     
        
        self.trajectory.clear() 
        return 

Gradient계산될 필요가 없는 advantage function 계산 부분은 with torch.no_grad():를 통해 처리하였습니다. 계산그래프상 PolicyNetwork랑 ValueNetwork의 충돌을 막기 위해서 .item() 이나 .detach()를 사용.

아래는 학습시 측정한 게임 스코어 그래프입니다. lambda값에 따라 collapse 일어나는 정도가 조금 다른 듯 합니다. Lunarlander도 lambda를 크게 하고 episode를 길게 하면 학습이 됩니다.
A2C(lambda 0.1)(cartpole) 
A2C(lambda 0.9)(cartpole) 
One-step Actor-Critic(cartpole) 
  REINFORCE with baseline(cartpole)
REINFORCE(cartpole)

Full code


import gym
import torch
import torch.nn as nn
from torch.distributions import Categorical
import matplotlib.pyplot as plt
!pip install gym[classic_control]
#!pip install gym[box2d] #for lunarlander
!apt update
!apt install xvfb
!pip install pyvirtualdisplay
!pip install gym-notebook-wrapper
import gnwrapper
!nvidia-smi
print(torch.cuda.is_available())
class Agent(nn.Module):
    def __init__(selfinput_dimoutput_dimwidth):
        super().__init__()
        self.P = PolicyNetwork(input_dim, output_dim, width)
        self.P.to(device)
        self.P.train()  
        self.P_optimizer = torch.optim.Adam(self.P.parameters(), lr=0.0003)
        self.V = ValueNetwork(input_dim, output_dim, width)
        self.V.to(device)        
        self.V.train()
        self.V_optimizer = torch.optim.Adam(self.V.parameters(), lr=0.0003)   ## ?? 0,01
        self.trajectory = []       
        self.env = gym.make(game_name)        

    def self_play_A2C(selfmax_timestep=1000000):
        game_score = 0
        state = self.env.reset() # env 시작
        for _ in range(max_timestep):
            output = self.P(torch.from_numpy(state).float().to(device)) # inference
            inferenced_v = self.V(torch.from_numpy(state).float().to(device))
            prob_distribution = Categorical(output) #확률분포 표현
            action = prob_distribution.sample() #확률분포로부터 action 선택
            state, r, done, _ = self.env.step(action.item()) # env 진행  
            with torch.no_grad():
                inferenced_v_from_next_s = self.V(torch.from_numpy(state).float().to(device))
            if done==True:
                inferenced_v_from_next_s = 0
             # a,r,v,v_next 저장 
            self.trajectory.append((prob_distribution.log_prob(action), r, inferenced_v, inferenced_v_from_next_s))
            game_score += r 
            if done:
                break
        return game_score

    def update_weights_A2C(self):
        A = 0 #GAE
        lam = 0.5 #0.1 0.9 test         
        self.V_optimizer.zero_grad()
        self.P_optimizer.zero_grad()
        for gradient_policy_a_s, r, v, v_next in self.trajectory[::-1]: 
            with torch.no_grad():           
                delta = r + 0.99* v_next - v
                A += (0.99 * lam) * delta  
                V_target = A + v   
            V_loss = (V_target.item() -  v)**2          
            V_loss.backward()            
            P_loss = -1 * A.item() * gradient_policy_a_s            
            P_loss.backward()             
        self.V_optimizer.step()   
        self.P_optimizer.step()             
        self.trajectory.clear() 
        return 
   
class PolicyNetwork(nn.Module):  
    def __init__(selfinput_dimoutput_dimwidth):
        super().__init__()
        self.layer1 = torch.nn.Linear(input_dim, width)
        self.layer2 = torch.nn.Linear(width, width) 
        self.layer3 = torch.nn.Linear(width, output_dim) 

    def forward(selfx):
        x = self.layer1(x)
        x = torch.nn.functional.relu(x)
        x = self.layer2(x)
        x = torch.nn.functional.relu(x)
        x = self.layer3(x)
        x = torch.nn.functional.softmax(x, dim=0)
        return x

class ValueNetwork(nn.Module): 
    def __init__(selfinput_dimoutput_dimwidth):
        super().__init__()
        self.layer1 = torch.nn.Linear(input_dim, width)
        self.layer2 = torch.nn.Linear(width, width) 
        self.layer3 = torch.nn.Linear(width, 1

    def forward(selfx):
        x = self.layer1(x)
        x = torch.nn.functional.relu(x)
        x = self.layer2(x)
        x = torch.nn.functional.relu(x)
        x = self.layer3(x)
        return x

device = torch.device('cuda:0'if torch.cuda.is_available() else torch.device('cpu')
score_arr = []
game_name = 'CartPole-v1' #'LunarLander-v2'
env = gym.make(game_name) 
agent = Agent(env.observation_space.shape[0], env.action_space.n, 128
print(agent)
env.close()

#Self play 및 weight update
episode_nums = 500 #LunarLander-v2는 더 길게
for i in range(episode_nums):    
    game_score = agent.self_play_A2C()
    agent.update_weights_A2C()    
    score_arr.append(game_score)  
    if i%50==0 : print('episode', i)    
torch.save(agent.state_dict(), 'weights.pt'
agent.env.close()
#Episode별 얻은 score
plt.plot(score_arr, label ='score')
plt.legend(loc='upper left')
#학습된 모델로 게임 play한 영상
agent.load_state_dict(torch.load("weights.pt"))
env = gnwrapper.LoopAnimation(gym.make(game_name)) 
state = env.reset()
for _ in range(200):
    with torch.no_grad():
        output = agent.P(torch.from_numpy(state).float().to(device)) # inference
        prob_distribution = Categorical(output) #확률분포 표현
        action = prob_distribution.sample() #확률분포로부터 action 선택
    env.render()
    state, rew, done, _ = env.step(action.item())
    if done:
        state = env.reset()
env.display()

댓글

가장 많이 본 글

구글 람다(LaMDA)란? - 구글의 언어 모델

알파고 강화학습 원리

버텍스 AI란? - 구글 인공지능 플랫폼

카타고와 바둑 두어보기

뉴럴 네트워크란?

블로그 글 목록

뉴럴 네트워크를 학습시키는 방법