From dde9bd17593000c27f44e81759a5e389d6f19e0a Mon Sep 17 00:00:00 2001 From: Namu Date: Fri, 17 Oct 2025 21:08:29 +0200 Subject: [PATCH] feat: add tp7 --- tp7.py | 135 ++++++++++++++++++++++++--------------------- tp7_gpt_exemple.py | 109 ++++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 64 deletions(-) create mode 100644 tp7_gpt_exemple.py diff --git a/tp7.py b/tp7.py index d6e96e8..b1f0e47 100644 --- a/tp7.py +++ b/tp7.py @@ -5,62 +5,71 @@ import torch.optim as optim import numpy as np import matplotlib.pyplot as plt - # ——— Réseaux de neurones ——— class Actor(nn.Module): def __init__(self, state_dim, action_dim): super().__init__() self.net = nn.Sequential( - nn.Linear(state_dim, 128), + nn.Linear(state_dim, 256), + nn.ReLU(), + nn.Linear(256, 256), nn.ReLU(), - nn.Linear(128, action_dim), - nn.Softmax(dim=-1) ) + self.mu_head = nn.Linear(256, action_dim) + self.log_std_head = nn.Linear(256, action_dim) def forward(self, state): - return self.net(state) - + x = self.net(state) + mu = self.mu_head(x) + log_std = torch.clamp(self.log_std_head(x), -20, 2) + std = torch.exp(log_std) + return mu, std class Critic(nn.Module): def __init__(self, state_dim): super().__init__() self.net = nn.Sequential( - nn.Linear(state_dim, 128), + nn.Linear(state_dim, 256), nn.ReLU(), - nn.Linear(128, 1) + nn.Linear(256, 256), + nn.ReLU(), + nn.Linear(256, 1) ) def forward(self, state): return self.net(state) - -def compute_returns(rewards, values, gamma): - """Calcule les retours et avantages normalisés""" +# ——— GAE ——— +def compute_gae(rewards, values, gamma, lam, next_value): + values = [v.detach() for v in values] + [next_value.detach()] + gae = 0 returns = [] - R = 0 - for r, v in zip(reversed(rewards), reversed(values)): - R = r + gamma * R - returns.insert(0, R) + for t in reversed(range(len(rewards))): + delta = rewards[t] + gamma * values[t + 1] - values[t] + gae = delta + gamma * lam * gae + returns.insert(0, gae + values[t]) returns = torch.tensor(returns, dtype=torch.float32) - values = torch.stack(values) - advantages = returns - values.squeeze() + advantages = returns - torch.stack(values[:-1]).squeeze() advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) return returns, advantages - +# ——— Entraînement ——— def train_and_save(): - env = gym.make("CartPole-v1") - actor = Actor(env.observation_space.shape[0], env.action_space.n) + env = gym.make("Pusher-v5") + actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]) critic = Critic(env.observation_space.shape[0]) - optimizerA = optim.Adam(actor.parameters(), lr=3e-3) - optimizerC = optim.Adam(critic.parameters(), lr=3e-3) - gamma = 0.99 + optimizerA = optim.Adam(actor.parameters(), lr=1e-4) + optimizerC = optim.Adam(critic.parameters(), lr=1e-4) + + gamma = 0.99 + lam = 0.95 + nb_episodes = 2000 - nb_episodes = 1500 rewards_history = [] advantages_history = [] critic_preds = [] + td_errors = [] for episode in range(nb_episodes): state, _ = env.reset() @@ -73,30 +82,42 @@ def train_and_save(): while not done: state_tensor = torch.tensor(state, dtype=torch.float32) - probs = actor(state_tensor) - dist = torch.distributions.Categorical(probs) - action = dist.sample() + mu, std = actor(state_tensor) + dist = torch.distributions.Normal(mu, std) + action = dist.rsample() - next_state, reward, done, trunc, _ = env.step(action.item()) + # clamp pour respecter les limites de l'environnement + low = torch.tensor(env.action_space.low, dtype=torch.float32) + high = torch.tensor(env.action_space.high, dtype=torch.float32) + action_clamped = torch.clamp(action, low, high) + + next_state, reward, terminated, truncated, _ = env.step(action_clamped.detach().numpy()) + done = terminated or truncated + + reward_scaled = reward / 10.0 # scaling pour stabiliser l'apprentissage value = critic(state_tensor) - log_prob = dist.log_prob(action) - entropy = dist.entropy() + log_prob = dist.log_prob(action).sum(dim=-1) + entropy = dist.entropy().sum(dim=-1) log_probs.append(log_prob) values.append(value) - rewards.append(reward) + rewards.append(reward_scaled) entropies.append(entropy) state = next_state - # ——— Calcul des avantages et retours ——— - returns, advantages = compute_returns(rewards, values, gamma) + # next_value pour GAE + state_tensor = torch.tensor(state, dtype=torch.float32) + next_value = critic(state_tensor).detach() # même si done=True + + # ——— GAE ——— + returns, advantages = compute_gae(rewards, values, gamma, lam, next_value) # ——— Mise à jour Actor ——— log_probs = torch.stack(log_probs) entropies = torch.stack(entropies) - actor_loss = -(log_probs * advantages.detach()).mean() - 0.01 * entropies.mean() + actor_loss = -(log_probs * advantages.detach()).mean() - 0.02 * entropies.mean() # entropy coeff réduit optimizerA.zero_grad() actor_loss.backward() @@ -113,57 +134,43 @@ def train_and_save(): rewards_history.append(total_reward) advantages_history.append(advantages.mean().item()) critic_preds.append(torch.stack(values).mean().item()) + td_errors.append((returns - torch.stack(values).squeeze()).mean().item()) - print(f"Épisode {episode}, Récompense : {total_reward:.1f}") + print(f"Épisode {episode}, Récompense : {total_reward:.2f}") # ——— Graphiques tous les 500 épisodes ——— if episode % 500 == 0 and episode != 0: - fig, axes = plt.subplots(1, 3, figsize=(15, 4)) - - axes[0].plot(rewards_history, label='Rewards') - axes[0].set_title('Rewards') - axes[0].legend() - - axes[1].plot(advantages_history, label='Advantages', color='orange') - axes[1].set_title('Advantages') - axes[1].legend() - - axes[2].plot(critic_preds, label='Critic Prediction', color='green') - axes[2].plot(rewards_history, label='Actual Reward', color='red', linestyle='--') - axes[2].set_title('Critic vs Reward') - axes[2].legend() - - plt.suptitle(f'Episode {episode}') + fig, axes = plt.subplots(1, 4, figsize=(20, 4)) + axes[0].plot(rewards_history, label='Rewards'); axes[0].set_title('Rewards'); axes[0].legend() + axes[1].plot(advantages_history, label='Advantages', color='orange'); axes[1].set_title('Advantages'); axes[1].legend() + axes[2].plot(critic_preds, label='Critic Prediction', color='green'); axes[2].plot(rewards_history, label='Actual Reward', color='red', linestyle='--'); axes[2].set_title('Critic vs Reward'); axes[2].legend() + axes[3].plot(td_errors, label='TD Error', color='purple'); axes[3].set_title('TD Error'); axes[3].legend() + plt.suptitle(f'Épisode {episode}') plt.tight_layout() plt.show() - if np.mean(rewards_history[-100:]) >= 475: - print("I see this as an absolute win!") - break + torch.save(actor.state_dict(), "a2c_pusher.pth") - torch.save(actor.state_dict(), "a2c_cartpole.pth") - - -def show(weights_path="a2c_cartpole.pth"): - env = gym.make("CartPole-v1", render_mode="human") - actor = Actor(env.observation_space.shape[0], env.action_space.n) +# ——— Démonstration ——— +def show(weights_path="a2c_pusher.pth"): + env = gym.make("Pusher-v5", render_mode="human") + actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]) actor.load_state_dict(torch.load(weights_path)) actor.eval() state, _ = env.reset() done = False while not done: - state_tensor = torch.tensor(state, dtype=torch.float32) + state_tensor = torch.tensor(state, dtype=torch.float32).detach() with torch.no_grad(): - probs = actor(state_tensor) - action = torch.argmax(probs).item() + mu, _ = actor(state_tensor) + action = mu.detach().numpy() next_state, _, terminated, truncated, _ = env.step(action) done = terminated or truncated state = next_state env.close() print("Demonstration finished.") - if __name__ == "__main__": train_and_save() show() diff --git a/tp7_gpt_exemple.py b/tp7_gpt_exemple.py new file mode 100644 index 0000000..5768983 --- /dev/null +++ b/tp7_gpt_exemple.py @@ -0,0 +1,109 @@ +import gymnasium as gym +import torch +import torch.nn as nn +import torch.optim as optim +from torch.distributions import Normal + +# ----------------------------- +# Réseau Actor-Critic +# ----------------------------- +class ActorCritic(nn.Module): + def __init__(self, state_dim, action_dim, hidden_dim=128): + super().__init__() + self.shared = nn.Sequential( + nn.Linear(state_dim, hidden_dim), + nn.ReLU() + ) + self.actor_mean = nn.Linear(hidden_dim, action_dim) + self.actor_logstd = nn.Parameter(torch.zeros(action_dim)) + self.critic = nn.Linear(hidden_dim, 1) + + def forward(self, x): + x = self.shared(x) + mean = self.actor_mean(x) + logstd = self.actor_logstd.expand_as(mean) + dist = Normal(mean, logstd.exp()) + value = self.critic(x) + return dist, value + +# ----------------------------- +# Agent A2C +# ----------------------------- +class A2CAgent: + def __init__(self, env_name, gamma=0.99, lr=1e-3): + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.env = gym.make(env_name) + self.gamma = gamma + + state_dim = self.env.observation_space.shape[0] + action_dim = self.env.action_space.shape[0] + + self.model = ActorCritic(state_dim, action_dim).to(self.device) + self.optimizer = optim.Adam(self.model.parameters(), lr=lr) + + def select_action(self, state): + state = torch.FloatTensor(state).to(self.device) + dist, _ = self.model(state) + action = dist.sample() + log_prob = dist.log_prob(action).sum(dim=-1) + return action.cpu().numpy(), log_prob + + def compute_returns(self, rewards, masks, next_value): + R = next_value + returns = [] + for step in reversed(range(len(rewards))): + R = rewards[step] + self.gamma * R * masks[step] + returns.insert(0, R) + return returns + + def update(self, trajectory, next_state): + states = torch.FloatTensor([t[0] for t in trajectory]).to(self.device) + actions = torch.FloatTensor([t[1] for t in trajectory]).to(self.device) + log_probs = torch.stack([t[2] for t in trajectory]).to(self.device) + rewards = [t[3] for t in trajectory] + masks = [t[4] for t in trajectory] + + with torch.no_grad(): + _, next_value = self.model(torch.FloatTensor(next_state).to(self.device)) + next_value = next_value.squeeze() + returns = self.compute_returns(rewards, masks, next_value) + returns = torch.FloatTensor(returns).to(self.device) + + dist, values = self.model(states) + advantages = returns - values.squeeze() + + actor_loss = -(log_probs * advantages.detach()).mean() + critic_loss = advantages.pow(2).mean() + loss = actor_loss + 0.5 * critic_loss + + self.optimizer.zero_grad() + loss.backward() + self.optimizer.step() + + def train(self, max_steps=2000, update_every=5): + state, _ = self.env.reset() + trajectory = [] + + for step in range(max_steps): + action, log_prob = self.select_action(state) + next_state, reward, terminated, truncated, _ = self.env.step(action) + done = terminated or truncated + mask = 0.0 if done else 1.0 # <-- correction ici + + trajectory.append((state, action, log_prob, reward, mask)) + state = next_state + + if (step + 1) % update_every == 0: + self.update(trajectory, next_state) + trajectory = [] + + if (step + 1) % 100 == 0: + print(f"Step {step + 1}, reward: {reward}") + + +# ----------------------------- +# Lancer l'entraînement +# ----------------------------- +if __name__ == "__main__": + agent = A2CAgent("Pusher-v5") + agent.train(max_steps=2000)