Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
140ac03222 | ||
|
|
dde9bd1759 | ||
|
|
0a73e87fd9 | ||
|
|
0c15382f8f | ||
|
|
4c3b81b779 | ||
|
|
d3500bff48 | ||
|
|
e92d445afc | ||
|
|
fecea4f5a0 |
2
ex2.py
2
ex2.py
@@ -119,5 +119,5 @@ def show(weights_path='cartpole_dqn.pth') -> None:
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trained_model = train_and_save()
|
||||
#trained_model = train_and_save()
|
||||
show()
|
||||
|
||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
147
tp3.py
Normal file
147
tp3.py
Normal file
@@ -0,0 +1,147 @@
|
||||
import random
|
||||
from collections import deque
|
||||
|
||||
import gymnasium as gym
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
|
||||
ACTION_SET = [
|
||||
np.zeros(17),
|
||||
np.full(17, -0.4),
|
||||
np.full(17, 0.4),
|
||||
np.concatenate([np.full(8, 0.4), np.full(9, -0.4)])
|
||||
]
|
||||
|
||||
class DQN(nn.Module):
|
||||
def __init__(self, n_states=348, n_actions=4):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(n_states, 64), nn.ReLU(),
|
||||
nn.Linear(64, n_actions)
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
"""
|
||||
Forward pass of the network.
|
||||
:param x: torch.Tensor of shape [n_states]
|
||||
:return: torch.Tensor of shape [n_actions] with Q-Values for each action
|
||||
"""
|
||||
return self.net(x)
|
||||
|
||||
|
||||
def train_and_save(weights_path="humanoid_dqn.pth", episodes=20_000, update_target_every=20):
|
||||
"""
|
||||
Train a DQN agent on the Humanoid-v5 environnement.
|
||||
:param weights_path: file path to save learned network weights
|
||||
:param episodes: number of training episodes (complete games)
|
||||
:param update_target_every: how many episodes to wait before syncing the target network
|
||||
:return: trained Q-Network ready to be used for inference
|
||||
"""
|
||||
|
||||
# environnement setup
|
||||
env = gym.make("Humanoid-v5")
|
||||
n_states, n_actions = env.observation_space.shape[0], len(ACTION_SET)
|
||||
|
||||
# les DQN
|
||||
policy_net = DQN(n_states, n_actions) # Q Network
|
||||
target_net = DQN(n_states, n_actions) # Target network
|
||||
target_net.load_state_dict(policy_net.state_dict()) # same weights at start
|
||||
target_net.eval()
|
||||
|
||||
# Optimizer et hyperparameters
|
||||
optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
|
||||
gamma = 0.99 # discount factor
|
||||
epsilon = 1.0 # Fréquence d'exploration initiale
|
||||
eps_min = 0.01 # Fréquence d'exploration minimale
|
||||
eps_decay = 0.9999 # Facteur de réduction d'epsilon
|
||||
memory = deque(maxlen=int(1e9))
|
||||
batch_size = 64
|
||||
|
||||
# main training loop
|
||||
for ep in range(episodes):
|
||||
# env.reset() returns a tuple (initial_state, info_dict)
|
||||
s, _ = env.reset()
|
||||
s = torch.tensor(s, dtype=torch.float32)
|
||||
done, total_r = False, 0
|
||||
|
||||
while not done:
|
||||
# epsilon-greedy à chaque prévision d'action pour une exploration plus fine (a = indice d'action, a_vecteur)
|
||||
if random.random() < epsilon:
|
||||
a = random.randrange(n_actions)
|
||||
else:
|
||||
a = torch.argmax(policy_net(s)).item()
|
||||
a_vector = ACTION_SET[a]
|
||||
# env.step(s) returns (next_state, reward, terminated, truncated, info)
|
||||
ns, r, done, _, _ = env.step(a_vector)
|
||||
ns = torch.tensor(ns, dtype=torch.float32)
|
||||
|
||||
memory.append((s, a ,r ,ns, done))
|
||||
s, total_r = ns, total_r + r
|
||||
|
||||
# learning phase
|
||||
if len(memory) >= batch_size:
|
||||
batch = random.sample(memory, batch_size)
|
||||
s_b, a_b, r_b, ns_b, d_b = zip(*batch)
|
||||
s_b = torch.stack(s_b)
|
||||
ns_b = torch.stack(ns_b)
|
||||
|
||||
# Q values for chosen actions
|
||||
q_pred = policy_net(s_b).gather(1, torch.tensor(a_b).unsqueeze(1)).squeeze()
|
||||
|
||||
# Target values using target network
|
||||
with torch.no_grad():
|
||||
q_next = target_net(ns_b).max(1)[0]
|
||||
q_target = torch.tensor(r_b, dtype=torch.float32) + \
|
||||
gamma * q_next * (1 - torch.tensor(d_b, dtype=torch.float32))
|
||||
|
||||
# MSE
|
||||
loss = ((q_pred - q_target)**2).mean()
|
||||
optimizer.zero_grad(); loss.backward(); optimizer.step()
|
||||
|
||||
# decay epsilon to gradually reduce exploration
|
||||
epsilon = max(eps_min, epsilon * eps_decay)
|
||||
|
||||
# Periodically synchronise target network with policy network
|
||||
if (ep + 1) % update_target_every == 0:
|
||||
target_net.load_state_dict(policy_net.state_dict())
|
||||
|
||||
if (ep + 1) % 20 == 0:
|
||||
print(f'Episode {ep + 1}: total reward {total_r:.1f}, epsilon {epsilon:.2f}')
|
||||
env.close()
|
||||
|
||||
# save trained policy network
|
||||
torch.save(policy_net.state_dict(), weights_path)
|
||||
print(f'Training finished. Weights saved to {weights_path}')
|
||||
return policy_net # <--- trained Q-network
|
||||
|
||||
|
||||
def show(weights_path="humanoid_dqn.pth") -> None:
|
||||
"""
|
||||
Load trained Q network and run a single episode to visually
|
||||
demonstrate the learned policy
|
||||
:param weights_path: path to the saved network weights
|
||||
:return:
|
||||
"""
|
||||
env = gym.make("Humanoid-v5", render_mode="human")
|
||||
qnet = DQN()
|
||||
qnet.load_state_dict(torch.load(weights_path))
|
||||
qnet.eval()
|
||||
|
||||
s, _ = env.reset()
|
||||
s = torch.tensor(s, dtype=torch.float32)
|
||||
done = False
|
||||
total_r = 0.0
|
||||
while not done:
|
||||
a = torch.argmax(qnet(s)).item()
|
||||
s_, r, done, _, _ = env.step(ACTION_SET[a])
|
||||
s = torch.tensor(s_, dtype=torch.float32)
|
||||
total_r += r
|
||||
env.close()
|
||||
print(f'Demonstration finished. Reward: {total_r:.2f}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trained_model = train_and_save()
|
||||
show()
|
||||
137
tp5.py
Normal file
137
tp5.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import gymnasium as gym
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
class Actor(nn.Module):
|
||||
"""
|
||||
The action DNN
|
||||
"""
|
||||
|
||||
def __init__(self, n_states, n_actions):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(n_states, 128), nn.ReLU(),
|
||||
nn.Linear(128, 64), nn.ReLU(),
|
||||
nn.Linear(64, n_actions)
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
return torch.softmax(self.net(x), dim=-1)
|
||||
|
||||
|
||||
class Critic(nn.Module):
|
||||
"""
|
||||
The critic DNN
|
||||
"""
|
||||
|
||||
def __init__(self, n_states):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(n_states, 64), nn.ReLU(),
|
||||
nn.Linear(64, 1)
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
return self.net(x)
|
||||
|
||||
|
||||
def train_and_save(weights_path="cartpole_actor_critic.pth", episodes=500):
|
||||
env = gym.make("CartPole-v1")
|
||||
n_states, n_actions = env.observation_space.shape[0], env.action_space.n
|
||||
|
||||
# Definition des DNN acteur & critique
|
||||
actor_net = Actor(n_states, n_actions)
|
||||
critic_net = Critic(n_states)
|
||||
|
||||
# Hyperparameters et optimiser
|
||||
optimizer_actor = optim.Adam(actor_net.parameters(), lr=1e-3)
|
||||
optimizer_critic = optim.Adam(critic_net.parameters(), lr=5e-4)
|
||||
gamma = 0.99
|
||||
|
||||
for ep in range(episodes):
|
||||
# le state courant donner par l'environnement
|
||||
s, _ = env.reset()
|
||||
s = torch.tensor(s, dtype=torch.float32)
|
||||
|
||||
# Des variables purement fonctionnelles
|
||||
done, total_r = False, 0
|
||||
|
||||
log_probs = []
|
||||
td_errors = []
|
||||
|
||||
while not done:
|
||||
# Acteur : choisit une action
|
||||
action_probs = actor_net(s)
|
||||
dist = torch.distributions.Categorical(action_probs)
|
||||
action = dist.sample()
|
||||
log_prob = dist.log_prob(action)
|
||||
|
||||
# Environnement : effectue l'action
|
||||
ns, r, terminated, truncated, _ = env.step(action.item())
|
||||
done = terminated or truncated
|
||||
ns = torch.tensor(ns, dtype=torch.float32)
|
||||
total_r += r
|
||||
|
||||
# Critique : calcule la TD error
|
||||
with torch.no_grad():
|
||||
value_ns = critic_net(ns) if not done else torch.tensor([0.0]) # force ns = 0 si la simulation et terminée
|
||||
value_n = critic_net(s)
|
||||
|
||||
td_error = r + gamma * value_ns - value_n # Pas de detach ici, car on veut le gradient pour le critic
|
||||
|
||||
# Actor loss
|
||||
actor_loss = -log_prob * td_error.detach() # Detach td_error pour l'actor
|
||||
optimizer_actor.zero_grad()
|
||||
actor_loss.backward()
|
||||
optimizer_actor.step()
|
||||
|
||||
# Critic loss
|
||||
critic_loss = td_error.pow(2).mean() # MSE
|
||||
optimizer_critic.zero_grad()
|
||||
critic_loss.backward()
|
||||
optimizer_critic.step()
|
||||
|
||||
print("value_n:", value_n.item(), "value_ns:", value_ns.item(), "td_error:", td_error.item())
|
||||
|
||||
log_probs.append(log_prob)
|
||||
td_errors.append(td_error)
|
||||
|
||||
# Mise à jour de l'état
|
||||
s = ns
|
||||
|
||||
print(f'Episode {ep + 1}: total reward {total_r:.1f}')
|
||||
|
||||
# Libération des ressources liées à l'environnement
|
||||
env.close()
|
||||
|
||||
# Sauvegarde des poids
|
||||
torch.save(actor_net.state_dict(), weights_path)
|
||||
print(f'Training finished. Weights saved to {weights_path}')
|
||||
return actor_net
|
||||
|
||||
|
||||
def show(weights_path="cartpole_actor_critic.pth"):
|
||||
env = gym.make("CartPole-v1", render_mode="human")
|
||||
actor_net = Actor(env.observation_space.shape[0], env.action_space.n)
|
||||
actor_net.load_state_dict(torch.load(weights_path))
|
||||
actor_net.eval()
|
||||
s, _ = env.reset()
|
||||
s = torch.tensor(s, dtype=torch.float32)
|
||||
done = False
|
||||
while not done:
|
||||
with torch.no_grad():
|
||||
action_probs = actor_net(s)
|
||||
action = torch.argmax(action_probs).item()
|
||||
s_, r, terminated, truncated, _ = env.step(action)
|
||||
done = terminated or truncated
|
||||
s = torch.tensor(s_, dtype=torch.float32)
|
||||
env.close()
|
||||
print('Demonstration finished.')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trained_model = train_and_save()
|
||||
show()
|
||||
169
tp6.py
Normal file
169
tp6.py
Normal file
@@ -0,0 +1,169 @@
|
||||
import gymnasium as gym
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
# ——— Réseaux de neurones ———
|
||||
class Actor(nn.Module):
|
||||
def __init__(self, state_dim, action_dim):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(state_dim, 128),
|
||||
nn.ReLU(),
|
||||
nn.Linear(128, action_dim),
|
||||
nn.Softmax(dim=-1)
|
||||
)
|
||||
|
||||
def forward(self, state):
|
||||
return self.net(state)
|
||||
|
||||
|
||||
class Critic(nn.Module):
|
||||
def __init__(self, state_dim):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(state_dim, 128),
|
||||
nn.ReLU(),
|
||||
nn.Linear(128, 1)
|
||||
)
|
||||
|
||||
def forward(self, state):
|
||||
return self.net(state)
|
||||
|
||||
|
||||
def compute_returns(rewards, values, gamma):
|
||||
"""Calcule les retours et avantages normalisés"""
|
||||
returns = []
|
||||
R = 0
|
||||
for r, v in zip(reversed(rewards), reversed(values)):
|
||||
R = r + gamma * R
|
||||
returns.insert(0, R)
|
||||
returns = torch.tensor(returns, dtype=torch.float32)
|
||||
values = torch.stack(values)
|
||||
advantages = returns - values.squeeze()
|
||||
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
|
||||
return returns, advantages
|
||||
|
||||
|
||||
def train_and_save():
|
||||
env = gym.make("CartPole-v1")
|
||||
actor = Actor(env.observation_space.shape[0], env.action_space.n)
|
||||
critic = Critic(env.observation_space.shape[0])
|
||||
|
||||
optimizerA = optim.Adam(actor.parameters(), lr=3e-3)
|
||||
optimizerC = optim.Adam(critic.parameters(), lr=3e-3)
|
||||
gamma = 0.99
|
||||
|
||||
nb_episodes = 1500
|
||||
rewards_history = []
|
||||
advantages_history = []
|
||||
critic_preds = []
|
||||
|
||||
for episode in range(nb_episodes):
|
||||
state, _ = env.reset()
|
||||
done = False
|
||||
|
||||
log_probs = []
|
||||
values = []
|
||||
rewards = []
|
||||
entropies = []
|
||||
|
||||
while not done:
|
||||
state_tensor = torch.tensor(state, dtype=torch.float32)
|
||||
probs = actor(state_tensor)
|
||||
dist = torch.distributions.Categorical(probs)
|
||||
action = dist.sample()
|
||||
|
||||
next_state, reward, done, trunc, _ = env.step(action.item())
|
||||
|
||||
value = critic(state_tensor)
|
||||
log_prob = dist.log_prob(action)
|
||||
entropy = dist.entropy()
|
||||
|
||||
log_probs.append(log_prob)
|
||||
values.append(value)
|
||||
rewards.append(reward)
|
||||
entropies.append(entropy)
|
||||
|
||||
state = next_state
|
||||
|
||||
# ——— Calcul des avantages et retours ———
|
||||
returns, advantages = compute_returns(rewards, values, gamma)
|
||||
|
||||
# ——— Mise à jour Actor ———
|
||||
log_probs = torch.stack(log_probs)
|
||||
entropies = torch.stack(entropies)
|
||||
actor_loss = -(log_probs * advantages.detach()).mean() - 0.01 * entropies.mean()
|
||||
|
||||
optimizerA.zero_grad()
|
||||
actor_loss.backward()
|
||||
optimizerA.step()
|
||||
|
||||
# ——— Mise à jour Critic ———
|
||||
critic_loss = (returns - torch.stack(values).squeeze()).pow(2).mean()
|
||||
|
||||
optimizerC.zero_grad()
|
||||
critic_loss.backward()
|
||||
optimizerC.step()
|
||||
|
||||
total_reward = sum(rewards)
|
||||
rewards_history.append(total_reward)
|
||||
advantages_history.append(advantages.mean().item())
|
||||
critic_preds.append(torch.stack(values).mean().item())
|
||||
|
||||
print(f"Épisode {episode}, Récompense : {total_reward:.1f}")
|
||||
|
||||
# ——— Graphiques tous les 500 épisodes ———
|
||||
if episode % 500 == 0 and episode != 0:
|
||||
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
|
||||
|
||||
axes[0].plot(rewards_history, label='Rewards')
|
||||
axes[0].set_title('Rewards')
|
||||
axes[0].legend()
|
||||
|
||||
axes[1].plot(advantages_history, label='Advantages', color='orange')
|
||||
axes[1].set_title('Advantages')
|
||||
axes[1].legend()
|
||||
|
||||
axes[2].plot(critic_preds, label='Critic Prediction', color='green')
|
||||
axes[2].plot(rewards_history, label='Actual Reward', color='red', linestyle='--')
|
||||
axes[2].set_title('Critic vs Reward')
|
||||
axes[2].legend()
|
||||
|
||||
plt.suptitle(f'Episode {episode}')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
if np.mean(rewards_history[-100:]) >= 475:
|
||||
print("I see this as an absolute win!")
|
||||
break
|
||||
|
||||
torch.save(actor.state_dict(), "a2c_cartpole.pth")
|
||||
|
||||
|
||||
def show(weights_path="a2c_cartpole.pth"):
|
||||
env = gym.make("CartPole-v1", render_mode="human")
|
||||
actor = Actor(env.observation_space.shape[0], env.action_space.n)
|
||||
actor.load_state_dict(torch.load(weights_path))
|
||||
actor.eval()
|
||||
|
||||
state, _ = env.reset()
|
||||
done = False
|
||||
while not done:
|
||||
state_tensor = torch.tensor(state, dtype=torch.float32)
|
||||
with torch.no_grad():
|
||||
probs = actor(state_tensor)
|
||||
action = torch.argmax(probs).item()
|
||||
next_state, _, terminated, truncated, _ = env.step(action)
|
||||
done = terminated or truncated
|
||||
state = next_state
|
||||
env.close()
|
||||
print("Demonstration finished.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
train_and_save()
|
||||
show()
|
||||
178
tp7.py
Normal file
178
tp7.py
Normal file
@@ -0,0 +1,178 @@
|
||||
import gymnasium as gym
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# ——— Réseaux de neurones ———
|
||||
class Actor(nn.Module):
|
||||
def __init__(self, state_dim, action_dim):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(state_dim, 256),
|
||||
nn.ReLU(),
|
||||
nn.Linear(256, 256),
|
||||
nn.ReLU(),
|
||||
)
|
||||
self.mu_head = nn.Linear(256, action_dim)
|
||||
self.log_std_head = nn.Linear(256, action_dim)
|
||||
|
||||
def forward(self, state):
|
||||
x = self.net(state)
|
||||
mu = self.mu_head(x)
|
||||
log_std = torch.clamp(self.log_std_head(x), -20, 2)
|
||||
std = torch.exp(log_std)
|
||||
return mu, std
|
||||
|
||||
|
||||
class Critic(nn.Module):
|
||||
def __init__(self, state_dim):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(state_dim, 256),
|
||||
nn.ReLU(),
|
||||
nn.Linear(256, 256),
|
||||
nn.ReLU(),
|
||||
nn.Linear(256, 1)
|
||||
)
|
||||
|
||||
def forward(self, state):
|
||||
return self.net(state)
|
||||
|
||||
# ——— GAE ———
|
||||
def compute_gae(rewards, values, gamma, lam, next_value):
|
||||
values = [v.detach() for v in values] + [next_value.detach()]
|
||||
gae = 0
|
||||
returns = []
|
||||
for t in reversed(range(len(rewards))):
|
||||
delta = rewards[t] + gamma * values[t + 1] - values[t]
|
||||
gae = delta + gamma * lam * gae
|
||||
returns.insert(0, gae + values[t])
|
||||
returns = torch.tensor(returns, dtype=torch.float32)
|
||||
advantages = returns - torch.stack(values[:-1]).squeeze()
|
||||
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
|
||||
return returns, advantages
|
||||
|
||||
|
||||
# ——— Entraînement ———
|
||||
def train_and_save():
|
||||
env = gym.make("Pusher-v5")
|
||||
actor = Actor(env.observation_space.shape[0], env.action_space.shape[0])
|
||||
critic = Critic(env.observation_space.shape[0])
|
||||
|
||||
optimizerA = optim.Adam(actor.parameters(), lr=1e-4)
|
||||
optimizerC = optim.Adam(critic.parameters(), lr=1e-4)
|
||||
|
||||
gamma = 0.99
|
||||
lam = 0.95
|
||||
nb_episodes = 2000
|
||||
|
||||
rewards_history = []
|
||||
advantages_history = []
|
||||
critic_preds = []
|
||||
td_errors = []
|
||||
|
||||
for episode in range(nb_episodes):
|
||||
state, _ = env.reset()
|
||||
done = False
|
||||
|
||||
log_probs = []
|
||||
values = []
|
||||
rewards = []
|
||||
entropies = []
|
||||
|
||||
while not done:
|
||||
state_tensor = torch.tensor(state, dtype=torch.float32)
|
||||
mu, std = actor(state_tensor)
|
||||
dist = torch.distributions.Normal(mu, std)
|
||||
action = dist.rsample()
|
||||
|
||||
# clamp pour respecter les limites de l'environnement
|
||||
low = torch.tensor(env.action_space.low, dtype=torch.float32)
|
||||
high = torch.tensor(env.action_space.high, dtype=torch.float32)
|
||||
action_clamped = torch.clamp(action, low, high)
|
||||
|
||||
next_state, reward, terminated, truncated, _ = env.step(action_clamped.detach().numpy())
|
||||
done = terminated or truncated
|
||||
|
||||
reward_scaled = reward / 10.0 # scaling pour stabiliser l'apprentissage
|
||||
|
||||
value = critic(state_tensor)
|
||||
log_prob = dist.log_prob(action).sum(dim=-1)
|
||||
entropy = dist.entropy().sum(dim=-1)
|
||||
|
||||
log_probs.append(log_prob)
|
||||
values.append(value)
|
||||
rewards.append(reward_scaled)
|
||||
entropies.append(entropy)
|
||||
|
||||
state = next_state
|
||||
|
||||
# next_value pour GAE
|
||||
state_tensor = torch.tensor(state, dtype=torch.float32)
|
||||
next_value = critic(state_tensor).detach() # même si done=True
|
||||
|
||||
# ——— GAE ———
|
||||
returns, advantages = compute_gae(rewards, values, gamma, lam, next_value)
|
||||
|
||||
# ——— Mise à jour Actor ———
|
||||
log_probs = torch.stack(log_probs)
|
||||
entropies = torch.stack(entropies)
|
||||
actor_loss = -(log_probs * advantages.detach()).mean() - 0.02 * entropies.mean() # entropy coeff réduit
|
||||
|
||||
optimizerA.zero_grad()
|
||||
actor_loss.backward()
|
||||
optimizerA.step()
|
||||
|
||||
# ——— Mise à jour Critic ———
|
||||
critic_loss = (returns - torch.stack(values).squeeze()).pow(2).mean()
|
||||
|
||||
optimizerC.zero_grad()
|
||||
critic_loss.backward()
|
||||
optimizerC.step()
|
||||
|
||||
total_reward = sum(rewards)
|
||||
rewards_history.append(total_reward)
|
||||
advantages_history.append(advantages.mean().item())
|
||||
critic_preds.append(torch.stack(values).mean().item())
|
||||
td_errors.append((returns - torch.stack(values).squeeze()).mean().item())
|
||||
|
||||
print(f"Épisode {episode}, Récompense : {total_reward:.2f}")
|
||||
|
||||
# ——— Graphiques tous les 500 épisodes ———
|
||||
if episode % 500 == 0 and episode != 0:
|
||||
fig, axes = plt.subplots(1, 4, figsize=(20, 4))
|
||||
axes[0].plot(rewards_history, label='Rewards'); axes[0].set_title('Rewards'); axes[0].legend()
|
||||
axes[1].plot(advantages_history, label='Advantages', color='orange'); axes[1].set_title('Advantages'); axes[1].legend()
|
||||
axes[2].plot(critic_preds, label='Critic Prediction', color='green'); axes[2].plot(rewards_history, label='Actual Reward', color='red', linestyle='--'); axes[2].set_title('Critic vs Reward'); axes[2].legend()
|
||||
axes[3].plot(td_errors, label='TD Error', color='purple'); axes[3].set_title('TD Error'); axes[3].legend()
|
||||
plt.suptitle(f'Épisode {episode}')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
torch.save(actor.state_dict(), "a2c_pusher.pth")
|
||||
|
||||
|
||||
def show(weights_path="a2c_pusher.pth"):
|
||||
env = gym.make("Pusher-v5", render_mode="human")
|
||||
actor = Actor(env.observation_space.shape[0], env.action_space.shape[0])
|
||||
actor.load_state_dict(torch.load(weights_path))
|
||||
actor.eval()
|
||||
|
||||
state, _ = env.reset()
|
||||
done = False
|
||||
while not done:
|
||||
state_tensor = torch.tensor(state, dtype=torch.float32).detach()
|
||||
with torch.no_grad():
|
||||
mu, _ = actor(state_tensor)
|
||||
action = mu.detach().numpy()
|
||||
next_state, _, terminated, truncated, _ = env.step(action)
|
||||
done = terminated or truncated
|
||||
state = next_state
|
||||
env.close()
|
||||
print("Demonstration finished.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
train_and_save()
|
||||
show()
|
||||
113
tp7_gpt_exemple.py
Normal file
113
tp7_gpt_exemple.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import gymnasium as gym
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
from torch.distributions import Normal
|
||||
|
||||
# -----------------------------
|
||||
# Réseau Actor-Critic
|
||||
# -----------------------------
|
||||
class ActorCritic(nn.Module):
|
||||
def __init__(self, state_dim, action_dim, hidden_dim=128):
|
||||
super().__init__()
|
||||
self.shared = nn.Sequential(
|
||||
nn.Linear(state_dim, hidden_dim),
|
||||
nn.ReLU()
|
||||
)
|
||||
self.actor_mean = nn.Linear(hidden_dim, action_dim)
|
||||
self.actor_logstd = nn.Parameter(torch.zeros(action_dim))
|
||||
self.critic = nn.Linear(hidden_dim, 1)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.shared(x)
|
||||
mean = self.actor_mean(x)
|
||||
logstd = self.actor_logstd.expand_as(mean)
|
||||
dist = Normal(mean, logstd.exp())
|
||||
value = self.critic(x)
|
||||
return dist, value
|
||||
|
||||
# -----------------------------
|
||||
# Agent A2C
|
||||
# -----------------------------
|
||||
class A2CAgent:
|
||||
def __init__(self, env_name, gamma=0.99, lr=1e-3):
|
||||
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
self.env = gym.make(env_name)
|
||||
self.gamma = gamma
|
||||
|
||||
state_dim = self.env.observation_space.shape[0]
|
||||
action_dim = self.env.action_space.shape[0]
|
||||
|
||||
self.model = ActorCritic(state_dim, action_dim).to(self.device)
|
||||
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
|
||||
|
||||
|
||||
def select_action(self, state):
|
||||
state = torch.FloatTensor(state).to(self.device)
|
||||
dist, _ = self.model(state)
|
||||
action = dist.sample()
|
||||
log_prob = dist.log_prob(action).sum(dim=-1)
|
||||
return action.cpu().numpy(), log_prob
|
||||
|
||||
|
||||
def compute_returns(self, rewards, masks, next_value):
|
||||
R = next_value
|
||||
returns = []
|
||||
for step in reversed(range(len(rewards))):
|
||||
R = rewards[step] + self.gamma * R * masks[step]
|
||||
returns.insert(0, R)
|
||||
return returns
|
||||
|
||||
|
||||
def update(self, trajectory, next_state):
|
||||
states = torch.FloatTensor([t[0] for t in trajectory]).to(self.device)
|
||||
actions = torch.FloatTensor([t[1] for t in trajectory]).to(self.device)
|
||||
log_probs = torch.stack([t[2] for t in trajectory]).to(self.device)
|
||||
rewards = [t[3] for t in trajectory]
|
||||
masks = [t[4] for t in trajectory]
|
||||
|
||||
with torch.no_grad():
|
||||
_, next_value = self.model(torch.FloatTensor(next_state).to(self.device))
|
||||
next_value = next_value.squeeze()
|
||||
returns = self.compute_returns(rewards, masks, next_value)
|
||||
returns = torch.FloatTensor(returns).to(self.device)
|
||||
|
||||
dist, values = self.model(states)
|
||||
advantages = returns - values.squeeze()
|
||||
|
||||
actor_loss = -(log_probs * advantages.detach()).mean()
|
||||
critic_loss = advantages.pow(2).mean()
|
||||
loss = actor_loss + 0.5 * critic_loss
|
||||
|
||||
self.optimizer.zero_grad()
|
||||
loss.backward()
|
||||
self.optimizer.step()
|
||||
|
||||
|
||||
def train(self, max_steps=2000, update_every=5):
|
||||
state, _ = self.env.reset()
|
||||
trajectory = []
|
||||
|
||||
for step in range(max_steps):
|
||||
action, log_prob = self.select_action(state)
|
||||
next_state, reward, terminated, truncated, _ = self.env.step(action)
|
||||
done = terminated or truncated
|
||||
mask = 0.0 if done else 1.0 # <-- correction ici
|
||||
|
||||
trajectory.append((state, action, log_prob, reward, mask))
|
||||
state = next_state
|
||||
|
||||
if (step + 1) % update_every == 0:
|
||||
self.update(trajectory, next_state)
|
||||
trajectory = []
|
||||
|
||||
if (step + 1) % 100 == 0:
|
||||
print(f"Step {step + 1}, reward: {reward}")
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Lancer l'entraînement
|
||||
# -----------------------------
|
||||
if __name__ == "__main__":
|
||||
agent = A2CAgent("Pusher-v5")
|
||||
agent.train(max_steps=2000)
|
||||
Reference in New Issue
Block a user