Files
tp2-iaavancee/tp2.py
Namu e26f2ff15d
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 24s
feat: Add the second exercise (first throw)
2025-09-28 20:40:47 +02:00

140 lines
4.6 KiB
Python

import random
from collections import deque
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
class DQN(nn.Module):
def __init__(self, n_states=6, n_actions=3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_states, 64), nn.ReLU(),
nn.Linear(64, n_actions)
)
def forward(self, x):
"""
Forward pass of the network.
:param x: torch.Tensor of shape [n_states]
:return: torch.Tensor of shape [n_actions] with Q-Values for each action
"""
return self.net(x)
def train_and_save(weights_path="acrobot_dqn.pth", episodes=2000, update_target_every=20):
"""
Train a DQN agent on the Acrobot-v1 environnement.
:param weights_path: file path to save learned network weights
:param episodes: number of training episodes (complete games)
:param update_target_every: how many episodes to wait before syncing the target network
:return: trained Q-Network ready to be used for inference
"""
# environnement setup
env = gym.make("Acrobot-v1")
n_states, n_actions = env.observation_space.shape[0], env.action_space.n
# les DQN
policy_net = DQN(n_states, n_actions) # Q Network
target_net = DQN(n_states, n_actions) # Target network
target_net.load_state_dict(policy_net.state_dict()) # same weights at start
target_net.eval()
# Optimizer et hyperparameters
optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
gamma = 0.99 # discount factor
epsilon = 1.0 # Fréquence d'exploration initiale
eps_min = 0.05 # Fréquence d'exploration minimale
eps_decay = 0.995 # Facteur de réduction d'epsilon
memory = deque(maxlen=5000)
batch_size = 64
# main training loop
for ep in range(episodes):
# env.reset() returns a tuple (initial_state, info_dict)
s, _ = env.reset()
s = torch.tensor(s, dtype=torch.float32)
done, total_r = False, 0
while not done:
# epsilon-greedy
if random.random() < epsilon:
a = random.randrange(n_actions)
else:
a = torch.argmax(policy_net(s)).item()
# env.step(s) returns (next_state, reward, terminated, truncated, info)
ns, r, done, _, _ = env.step(a)
ns = torch.tensor(ns, dtype=torch.float32)
memory.append((s, a ,r ,ns, done))
s, total_r = ns, total_r + r
# learning phase
if len(memory) >= batch_size:
batch = random.sample(memory, batch_size)
s_b, a_b, r_b, ns_b, d_b = zip(*batch)
s_b = torch.stack(s_b)
ns_b = torch.stack(ns_b)
# Q values for chosen actions
q_pred = policy_net(s_b).gather(1, torch.tensor(a_b).unsqueeze(1)).squeeze()
# Target values using target network
with torch.no_grad():
q_next = target_net(ns_b).max(1)[0]
q_target = torch.tensor(r_b, dtype=torch.float32) + \
gamma * q_next * (1 - torch.tensor(d_b, dtype=torch.float32))
# MSE
loss = ((q_pred - q_target)**2).mean()
optimizer.zero_grad(); loss.backward(); optimizer.step()
# decay epsilon to gradually reduce exploration
epsilon = max(eps_min, epsilon * eps_decay)
# Periodically synchronise target network with policy network
if (ep + 1) % update_target_every == 0:
target_net.load_state_dict(policy_net.state_dict())
if (ep + 1) % 20 == 0:
print(f'Episode {ep + 1}: total reward {total_r:.1f}, epsilon {epsilon:.2f}')
env.close()
# save trained policy network
torch.save(policy_net.state_dict(), weights_path)
print(f'Training finished. Weights saved to {weights_path}')
return policy_net # <--- trained Q-network
def show(weights_path="acrobot_dqn.pth") -> None:
"""
Load trained Q network and run a single episode to visually
demonstrate the learned policy
:param weights_path: path to the saved network weights
:return:
"""
env = gym.make("Acrobot-v1", render_mode="human")
qnet = DQN()
qnet.load_state_dict(torch.load(weights_path))
qnet.eval()
s, _ = env.reset()
s = torch.tensor(s, dtype=torch.float32)
done = False
while not done:
a = torch.argmax(qnet(s)).item()
s_, r, done, _, _ = env.step(a)
s = torch.tensor(s_, dtype=torch.float32)
env.close()
print('Demonstration finished.')
if __name__ == '__main__':
trained_model = train_and_save()
show()