feat: add tp7
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 23s
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 23s
This commit is contained in:
135
tp7.py
135
tp7.py
@@ -5,62 +5,71 @@ import torch.optim as optim
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
|
||||||
# ——— Réseaux de neurones ———
|
# ——— Réseaux de neurones ———
|
||||||
class Actor(nn.Module):
|
class Actor(nn.Module):
|
||||||
def __init__(self, state_dim, action_dim):
|
def __init__(self, state_dim, action_dim):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.net = nn.Sequential(
|
self.net = nn.Sequential(
|
||||||
nn.Linear(state_dim, 128),
|
nn.Linear(state_dim, 256),
|
||||||
|
nn.ReLU(),
|
||||||
|
nn.Linear(256, 256),
|
||||||
nn.ReLU(),
|
nn.ReLU(),
|
||||||
nn.Linear(128, action_dim),
|
|
||||||
nn.Softmax(dim=-1)
|
|
||||||
)
|
)
|
||||||
|
self.mu_head = nn.Linear(256, action_dim)
|
||||||
|
self.log_std_head = nn.Linear(256, action_dim)
|
||||||
|
|
||||||
def forward(self, state):
|
def forward(self, state):
|
||||||
return self.net(state)
|
x = self.net(state)
|
||||||
|
mu = self.mu_head(x)
|
||||||
|
log_std = torch.clamp(self.log_std_head(x), -20, 2)
|
||||||
|
std = torch.exp(log_std)
|
||||||
|
return mu, std
|
||||||
|
|
||||||
class Critic(nn.Module):
|
class Critic(nn.Module):
|
||||||
def __init__(self, state_dim):
|
def __init__(self, state_dim):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.net = nn.Sequential(
|
self.net = nn.Sequential(
|
||||||
nn.Linear(state_dim, 128),
|
nn.Linear(state_dim, 256),
|
||||||
nn.ReLU(),
|
nn.ReLU(),
|
||||||
nn.Linear(128, 1)
|
nn.Linear(256, 256),
|
||||||
|
nn.ReLU(),
|
||||||
|
nn.Linear(256, 1)
|
||||||
)
|
)
|
||||||
|
|
||||||
def forward(self, state):
|
def forward(self, state):
|
||||||
return self.net(state)
|
return self.net(state)
|
||||||
|
|
||||||
|
# ——— GAE ———
|
||||||
def compute_returns(rewards, values, gamma):
|
def compute_gae(rewards, values, gamma, lam, next_value):
|
||||||
"""Calcule les retours et avantages normalisés"""
|
values = [v.detach() for v in values] + [next_value.detach()]
|
||||||
|
gae = 0
|
||||||
returns = []
|
returns = []
|
||||||
R = 0
|
for t in reversed(range(len(rewards))):
|
||||||
for r, v in zip(reversed(rewards), reversed(values)):
|
delta = rewards[t] + gamma * values[t + 1] - values[t]
|
||||||
R = r + gamma * R
|
gae = delta + gamma * lam * gae
|
||||||
returns.insert(0, R)
|
returns.insert(0, gae + values[t])
|
||||||
returns = torch.tensor(returns, dtype=torch.float32)
|
returns = torch.tensor(returns, dtype=torch.float32)
|
||||||
values = torch.stack(values)
|
advantages = returns - torch.stack(values[:-1]).squeeze()
|
||||||
advantages = returns - values.squeeze()
|
|
||||||
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
|
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
|
||||||
return returns, advantages
|
return returns, advantages
|
||||||
|
|
||||||
|
# ——— Entraînement ———
|
||||||
def train_and_save():
|
def train_and_save():
|
||||||
env = gym.make("CartPole-v1")
|
env = gym.make("Pusher-v5")
|
||||||
actor = Actor(env.observation_space.shape[0], env.action_space.n)
|
actor = Actor(env.observation_space.shape[0], env.action_space.shape[0])
|
||||||
critic = Critic(env.observation_space.shape[0])
|
critic = Critic(env.observation_space.shape[0])
|
||||||
|
|
||||||
optimizerA = optim.Adam(actor.parameters(), lr=3e-3)
|
optimizerA = optim.Adam(actor.parameters(), lr=1e-4)
|
||||||
optimizerC = optim.Adam(critic.parameters(), lr=3e-3)
|
optimizerC = optim.Adam(critic.parameters(), lr=1e-4)
|
||||||
gamma = 0.99
|
|
||||||
|
gamma = 0.99
|
||||||
|
lam = 0.95
|
||||||
|
nb_episodes = 2000
|
||||||
|
|
||||||
nb_episodes = 1500
|
|
||||||
rewards_history = []
|
rewards_history = []
|
||||||
advantages_history = []
|
advantages_history = []
|
||||||
critic_preds = []
|
critic_preds = []
|
||||||
|
td_errors = []
|
||||||
|
|
||||||
for episode in range(nb_episodes):
|
for episode in range(nb_episodes):
|
||||||
state, _ = env.reset()
|
state, _ = env.reset()
|
||||||
@@ -73,30 +82,42 @@ def train_and_save():
|
|||||||
|
|
||||||
while not done:
|
while not done:
|
||||||
state_tensor = torch.tensor(state, dtype=torch.float32)
|
state_tensor = torch.tensor(state, dtype=torch.float32)
|
||||||
probs = actor(state_tensor)
|
mu, std = actor(state_tensor)
|
||||||
dist = torch.distributions.Categorical(probs)
|
dist = torch.distributions.Normal(mu, std)
|
||||||
action = dist.sample()
|
action = dist.rsample()
|
||||||
|
|
||||||
next_state, reward, done, trunc, _ = env.step(action.item())
|
# clamp pour respecter les limites de l'environnement
|
||||||
|
low = torch.tensor(env.action_space.low, dtype=torch.float32)
|
||||||
|
high = torch.tensor(env.action_space.high, dtype=torch.float32)
|
||||||
|
action_clamped = torch.clamp(action, low, high)
|
||||||
|
|
||||||
|
next_state, reward, terminated, truncated, _ = env.step(action_clamped.detach().numpy())
|
||||||
|
done = terminated or truncated
|
||||||
|
|
||||||
|
reward_scaled = reward / 10.0 # scaling pour stabiliser l'apprentissage
|
||||||
|
|
||||||
value = critic(state_tensor)
|
value = critic(state_tensor)
|
||||||
log_prob = dist.log_prob(action)
|
log_prob = dist.log_prob(action).sum(dim=-1)
|
||||||
entropy = dist.entropy()
|
entropy = dist.entropy().sum(dim=-1)
|
||||||
|
|
||||||
log_probs.append(log_prob)
|
log_probs.append(log_prob)
|
||||||
values.append(value)
|
values.append(value)
|
||||||
rewards.append(reward)
|
rewards.append(reward_scaled)
|
||||||
entropies.append(entropy)
|
entropies.append(entropy)
|
||||||
|
|
||||||
state = next_state
|
state = next_state
|
||||||
|
|
||||||
# ——— Calcul des avantages et retours ———
|
# next_value pour GAE
|
||||||
returns, advantages = compute_returns(rewards, values, gamma)
|
state_tensor = torch.tensor(state, dtype=torch.float32)
|
||||||
|
next_value = critic(state_tensor).detach() # même si done=True
|
||||||
|
|
||||||
|
# ——— GAE ———
|
||||||
|
returns, advantages = compute_gae(rewards, values, gamma, lam, next_value)
|
||||||
|
|
||||||
# ——— Mise à jour Actor ———
|
# ——— Mise à jour Actor ———
|
||||||
log_probs = torch.stack(log_probs)
|
log_probs = torch.stack(log_probs)
|
||||||
entropies = torch.stack(entropies)
|
entropies = torch.stack(entropies)
|
||||||
actor_loss = -(log_probs * advantages.detach()).mean() - 0.01 * entropies.mean()
|
actor_loss = -(log_probs * advantages.detach()).mean() - 0.02 * entropies.mean() # entropy coeff réduit
|
||||||
|
|
||||||
optimizerA.zero_grad()
|
optimizerA.zero_grad()
|
||||||
actor_loss.backward()
|
actor_loss.backward()
|
||||||
@@ -113,57 +134,43 @@ def train_and_save():
|
|||||||
rewards_history.append(total_reward)
|
rewards_history.append(total_reward)
|
||||||
advantages_history.append(advantages.mean().item())
|
advantages_history.append(advantages.mean().item())
|
||||||
critic_preds.append(torch.stack(values).mean().item())
|
critic_preds.append(torch.stack(values).mean().item())
|
||||||
|
td_errors.append((returns - torch.stack(values).squeeze()).mean().item())
|
||||||
|
|
||||||
print(f"Épisode {episode}, Récompense : {total_reward:.1f}")
|
print(f"Épisode {episode}, Récompense : {total_reward:.2f}")
|
||||||
|
|
||||||
# ——— Graphiques tous les 500 épisodes ———
|
# ——— Graphiques tous les 500 épisodes ———
|
||||||
if episode % 500 == 0 and episode != 0:
|
if episode % 500 == 0 and episode != 0:
|
||||||
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
|
fig, axes = plt.subplots(1, 4, figsize=(20, 4))
|
||||||
|
axes[0].plot(rewards_history, label='Rewards'); axes[0].set_title('Rewards'); axes[0].legend()
|
||||||
axes[0].plot(rewards_history, label='Rewards')
|
axes[1].plot(advantages_history, label='Advantages', color='orange'); axes[1].set_title('Advantages'); axes[1].legend()
|
||||||
axes[0].set_title('Rewards')
|
axes[2].plot(critic_preds, label='Critic Prediction', color='green'); axes[2].plot(rewards_history, label='Actual Reward', color='red', linestyle='--'); axes[2].set_title('Critic vs Reward'); axes[2].legend()
|
||||||
axes[0].legend()
|
axes[3].plot(td_errors, label='TD Error', color='purple'); axes[3].set_title('TD Error'); axes[3].legend()
|
||||||
|
plt.suptitle(f'Épisode {episode}')
|
||||||
axes[1].plot(advantages_history, label='Advantages', color='orange')
|
|
||||||
axes[1].set_title('Advantages')
|
|
||||||
axes[1].legend()
|
|
||||||
|
|
||||||
axes[2].plot(critic_preds, label='Critic Prediction', color='green')
|
|
||||||
axes[2].plot(rewards_history, label='Actual Reward', color='red', linestyle='--')
|
|
||||||
axes[2].set_title('Critic vs Reward')
|
|
||||||
axes[2].legend()
|
|
||||||
|
|
||||||
plt.suptitle(f'Episode {episode}')
|
|
||||||
plt.tight_layout()
|
plt.tight_layout()
|
||||||
plt.show()
|
plt.show()
|
||||||
|
|
||||||
if np.mean(rewards_history[-100:]) >= 475:
|
torch.save(actor.state_dict(), "a2c_pusher.pth")
|
||||||
print("I see this as an absolute win!")
|
|
||||||
break
|
|
||||||
|
|
||||||
torch.save(actor.state_dict(), "a2c_cartpole.pth")
|
# ——— Démonstration ———
|
||||||
|
def show(weights_path="a2c_pusher.pth"):
|
||||||
|
env = gym.make("Pusher-v5", render_mode="human")
|
||||||
def show(weights_path="a2c_cartpole.pth"):
|
actor = Actor(env.observation_space.shape[0], env.action_space.shape[0])
|
||||||
env = gym.make("CartPole-v1", render_mode="human")
|
|
||||||
actor = Actor(env.observation_space.shape[0], env.action_space.n)
|
|
||||||
actor.load_state_dict(torch.load(weights_path))
|
actor.load_state_dict(torch.load(weights_path))
|
||||||
actor.eval()
|
actor.eval()
|
||||||
|
|
||||||
state, _ = env.reset()
|
state, _ = env.reset()
|
||||||
done = False
|
done = False
|
||||||
while not done:
|
while not done:
|
||||||
state_tensor = torch.tensor(state, dtype=torch.float32)
|
state_tensor = torch.tensor(state, dtype=torch.float32).detach()
|
||||||
with torch.no_grad():
|
with torch.no_grad():
|
||||||
probs = actor(state_tensor)
|
mu, _ = actor(state_tensor)
|
||||||
action = torch.argmax(probs).item()
|
action = mu.detach().numpy()
|
||||||
next_state, _, terminated, truncated, _ = env.step(action)
|
next_state, _, terminated, truncated, _ = env.step(action)
|
||||||
done = terminated or truncated
|
done = terminated or truncated
|
||||||
state = next_state
|
state = next_state
|
||||||
env.close()
|
env.close()
|
||||||
print("Demonstration finished.")
|
print("Demonstration finished.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
train_and_save()
|
train_and_save()
|
||||||
show()
|
show()
|
||||||
|
|||||||
109
tp7_gpt_exemple.py
Normal file
109
tp7_gpt_exemple.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
import gymnasium as gym
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
import torch.optim as optim
|
||||||
|
from torch.distributions import Normal
|
||||||
|
|
||||||
|
# -----------------------------
|
||||||
|
# Réseau Actor-Critic
|
||||||
|
# -----------------------------
|
||||||
|
class ActorCritic(nn.Module):
|
||||||
|
def __init__(self, state_dim, action_dim, hidden_dim=128):
|
||||||
|
super().__init__()
|
||||||
|
self.shared = nn.Sequential(
|
||||||
|
nn.Linear(state_dim, hidden_dim),
|
||||||
|
nn.ReLU()
|
||||||
|
)
|
||||||
|
self.actor_mean = nn.Linear(hidden_dim, action_dim)
|
||||||
|
self.actor_logstd = nn.Parameter(torch.zeros(action_dim))
|
||||||
|
self.critic = nn.Linear(hidden_dim, 1)
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = self.shared(x)
|
||||||
|
mean = self.actor_mean(x)
|
||||||
|
logstd = self.actor_logstd.expand_as(mean)
|
||||||
|
dist = Normal(mean, logstd.exp())
|
||||||
|
value = self.critic(x)
|
||||||
|
return dist, value
|
||||||
|
|
||||||
|
# -----------------------------
|
||||||
|
# Agent A2C
|
||||||
|
# -----------------------------
|
||||||
|
class A2CAgent:
|
||||||
|
def __init__(self, env_name, gamma=0.99, lr=1e-3):
|
||||||
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||||
|
self.env = gym.make(env_name)
|
||||||
|
self.gamma = gamma
|
||||||
|
|
||||||
|
state_dim = self.env.observation_space.shape[0]
|
||||||
|
action_dim = self.env.action_space.shape[0]
|
||||||
|
|
||||||
|
self.model = ActorCritic(state_dim, action_dim).to(self.device)
|
||||||
|
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
|
||||||
|
|
||||||
|
def select_action(self, state):
|
||||||
|
state = torch.FloatTensor(state).to(self.device)
|
||||||
|
dist, _ = self.model(state)
|
||||||
|
action = dist.sample()
|
||||||
|
log_prob = dist.log_prob(action).sum(dim=-1)
|
||||||
|
return action.cpu().numpy(), log_prob
|
||||||
|
|
||||||
|
def compute_returns(self, rewards, masks, next_value):
|
||||||
|
R = next_value
|
||||||
|
returns = []
|
||||||
|
for step in reversed(range(len(rewards))):
|
||||||
|
R = rewards[step] + self.gamma * R * masks[step]
|
||||||
|
returns.insert(0, R)
|
||||||
|
return returns
|
||||||
|
|
||||||
|
def update(self, trajectory, next_state):
|
||||||
|
states = torch.FloatTensor([t[0] for t in trajectory]).to(self.device)
|
||||||
|
actions = torch.FloatTensor([t[1] for t in trajectory]).to(self.device)
|
||||||
|
log_probs = torch.stack([t[2] for t in trajectory]).to(self.device)
|
||||||
|
rewards = [t[3] for t in trajectory]
|
||||||
|
masks = [t[4] for t in trajectory]
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
_, next_value = self.model(torch.FloatTensor(next_state).to(self.device))
|
||||||
|
next_value = next_value.squeeze()
|
||||||
|
returns = self.compute_returns(rewards, masks, next_value)
|
||||||
|
returns = torch.FloatTensor(returns).to(self.device)
|
||||||
|
|
||||||
|
dist, values = self.model(states)
|
||||||
|
advantages = returns - values.squeeze()
|
||||||
|
|
||||||
|
actor_loss = -(log_probs * advantages.detach()).mean()
|
||||||
|
critic_loss = advantages.pow(2).mean()
|
||||||
|
loss = actor_loss + 0.5 * critic_loss
|
||||||
|
|
||||||
|
self.optimizer.zero_grad()
|
||||||
|
loss.backward()
|
||||||
|
self.optimizer.step()
|
||||||
|
|
||||||
|
def train(self, max_steps=2000, update_every=5):
|
||||||
|
state, _ = self.env.reset()
|
||||||
|
trajectory = []
|
||||||
|
|
||||||
|
for step in range(max_steps):
|
||||||
|
action, log_prob = self.select_action(state)
|
||||||
|
next_state, reward, terminated, truncated, _ = self.env.step(action)
|
||||||
|
done = terminated or truncated
|
||||||
|
mask = 0.0 if done else 1.0 # <-- correction ici
|
||||||
|
|
||||||
|
trajectory.append((state, action, log_prob, reward, mask))
|
||||||
|
state = next_state
|
||||||
|
|
||||||
|
if (step + 1) % update_every == 0:
|
||||||
|
self.update(trajectory, next_state)
|
||||||
|
trajectory = []
|
||||||
|
|
||||||
|
if (step + 1) % 100 == 0:
|
||||||
|
print(f"Step {step + 1}, reward: {reward}")
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------
|
||||||
|
# Lancer l'entraînement
|
||||||
|
# -----------------------------
|
||||||
|
if __name__ == "__main__":
|
||||||
|
agent = A2CAgent("Pusher-v5")
|
||||||
|
agent.train(max_steps=2000)
|
||||||
Reference in New Issue
Block a user