개요
Advantage Actor-Critic(A2C) 알고리즘을 PyTorch 코드로 구현해보았으며 Colab에서 코드 실행 및 결과 확인이 가능합니다.
설명
Advantage Actor-Critic 알고리즘은 Actor-Critic에 Advantage function 개념이 적용되어 보다 안정적인 학습이 가능한 알고리즘입니다.
해당 계산을 위해 직접 action-value function(Q)을 학습시켜서 사용할수도 있지만, 여러 비효율적인 부분이 있어 state-value function(V)를 활용해 estimate하는 방법을 사용합니다.
이번 구현에서는 Generalized Advantage Estimation (GAE)를 사용해서 계산하였습니다.
간략한 연산 흐름은 다음과 같습니다.
1.self-play로 trajectory 모으기. (a,r,v,v_next)
2.advantage function 계산
3.value target 계산
4.loss계산 및 파라미터 업데이트
def self_play_A2C(self, max_timestep=1000000):
game_score = 0
state = self.env.reset() # env 시작
for _ in range(max_timestep):
output = self.P(torch.from_numpy(state).float().to(device)) # inference
inferenced_v = self.V(torch.from_numpy(state).float().to(device))
prob_distribution = Categorical(output) #확률분포 표현
action = prob_distribution.sample() #확률분포로부터 action 선택
state, r, done, _ = self.env.step(action.item()) # env 진행
with torch.no_grad():
inferenced_v_from_next_s = self.V(torch.from_numpy(state).float().to(device))
if done==True:
inferenced_v_from_next_s = 0
# a,r,v,v_next 저장
self.trajectory.append((prob_distribution.log_prob(action), r, inferenced_v, inferenced_v_from_next_s))
game_score += r
if done:
break
return game_score
def update_weights_A2C(self):
A = 0 #GAE
lam = 0.9 #0.5 0.9 test
self.V_optimizer.zero_grad()
self.P_optimizer.zero_grad()
for gradient_policy_a_s, r, v, v_next in self.trajectory[::-1]:
with torch.no_grad():
delta = r + 0.99* v_next - v
A += (0.99 * lam) * delta
V_target = A + v
V_loss = (V_target.item() - v)**2
V_loss.backward()
P_loss = -1 * A.item() * gradient_policy_a_s
P_loss.backward()
self.V_optimizer.step()
self.P_optimizer.step()
self.trajectory.clear()
return
Gradient계산될 필요가 없는 advantage function 계산 부분은 with torch.no_grad():를 통해 처리하였습니다. 계산그래프상 PolicyNetwork랑 ValueNetwork의 충돌을 막기 위해서 .item() 이나 .detach()를 사용.
아래는 학습시 측정한 게임 스코어 그래프입니다. lambda값에 따라 collapse 일어나는 정도가 조금 다른 듯 합니다. Lunarlander도 lambda를 크게 하고 episode를 길게 하면 학습이 됩니다.
Full code
import gym
import torch
import torch.nn as nn
from torch.distributions import Categorical
import matplotlib.pyplot as plt
!pip install gym[classic_control]
#!pip install gym[box2d] #for lunarlander
!apt update
!apt install xvfb
!pip install pyvirtualdisplay
!pip install gym-notebook-wrapper
import gnwrapper
!nvidia-smi
print(torch.cuda.is_available())
class Agent(nn.Module):
def __init__(self, input_dim, output_dim, width):
super().__init__()
self.P = PolicyNetwork(input_dim, output_dim, width)
self.P.to(device)
self.P.train()
self.P_optimizer = torch.optim.Adam(self.P.parameters(), lr=0.0003)
self.V = ValueNetwork(input_dim, output_dim, width)
self.V.to(device)
self.V.train()
self.V_optimizer = torch.optim.Adam(self.V.parameters(), lr=0.0003) ## ?? 0,01
self.trajectory = []
self.env = gym.make(game_name)
def self_play_A2C(self, max_timestep=1000000):
game_score = 0
state = self.env.reset() # env 시작
for _ in range(max_timestep):
output = self.P(torch.from_numpy(state).float().to(device)) # inference
inferenced_v = self.V(torch.from_numpy(state).float().to(device))
prob_distribution = Categorical(output) #확률분포 표현
action = prob_distribution.sample() #확률분포로부터 action 선택
state, r, done, _ = self.env.step(action.item()) # env 진행
with torch.no_grad():
inferenced_v_from_next_s = self.V(torch.from_numpy(state).float().to(device))
if done==True:
inferenced_v_from_next_s = 0
# a,r,v,v_next 저장
self.trajectory.append((prob_distribution.log_prob(action), r, inferenced_v, inferenced_v_from_next_s))
game_score += r
if done:
break
return game_score
def update_weights_A2C(self):
A = 0 #GAE
lam = 0.5 #0.1 0.9 test
self.V_optimizer.zero_grad()
self.P_optimizer.zero_grad()
for gradient_policy_a_s, r, v, v_next in self.trajectory[::-1]:
with torch.no_grad():
delta = r + 0.99* v_next - v
A += (0.99 * lam) * delta
V_target = A + v
V_loss = (V_target.item() - v)**2
V_loss.backward()
P_loss = -1 * A.item() * gradient_policy_a_s
P_loss.backward()
self.V_optimizer.step()
self.P_optimizer.step()
self.trajectory.clear()
return
class PolicyNetwork(nn.Module):
def __init__(self, input_dim, output_dim, width):
super().__init__()
self.layer1 = torch.nn.Linear(input_dim, width)
self.layer2 = torch.nn.Linear(width, width)
self.layer3 = torch.nn.Linear(width, output_dim)
def forward(self, x):
x = self.layer1(x)
x = torch.nn.functional.relu(x)
x = self.layer2(x)
x = torch.nn.functional.relu(x)
x = self.layer3(x)
x = torch.nn.functional.softmax(x, dim=0)
return x
class ValueNetwork(nn.Module):
def __init__(self, input_dim, output_dim, width):
super().__init__()
self.layer1 = torch.nn.Linear(input_dim, width)
self.layer2 = torch.nn.Linear(width, width)
self.layer3 = torch.nn.Linear(width, 1)
def forward(self, x):
x = self.layer1(x)
x = torch.nn.functional.relu(x)
x = self.layer2(x)
x = torch.nn.functional.relu(x)
x = self.layer3(x)
return x
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
score_arr = []
game_name = 'CartPole-v1' #'LunarLander-v2'
env = gym.make(game_name)
agent = Agent(env.observation_space.shape[0], env.action_space.n, 128)
print(agent)
env.close()
#Self play 및 weight update
episode_nums = 500 #LunarLander-v2는 더 길게
for i in range(episode_nums):
game_score = agent.self_play_A2C()
agent.update_weights_A2C()
score_arr.append(game_score)
if i%50==0 : print('episode', i)
torch.save(agent.state_dict(), 'weights.pt')
agent.env.close()
#Episode별 얻은 score
plt.plot(score_arr, label ='score')
plt.legend(loc='upper left')
#학습된 모델로 게임 play한 영상
agent.load_state_dict(torch.load("weights.pt"))
env = gnwrapper.LoopAnimation(gym.make(game_name))
state = env.reset()
for _ in range(200):
with torch.no_grad():
output = agent.P(torch.from_numpy(state).float().to(device)) # inference
prob_distribution = Categorical(output) #확률분포 표현
action = prob_distribution.sample() #확률분포로부터 action 선택
env.render()
state, rew, done, _ = env.step(action.item())
if done:
state = env.reset()
env.display()
댓글
댓글 쓰기