feat: add tp3
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 23s
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 23s
This commit is contained in:
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
147
tp3.py
Normal file
147
tp3.py
Normal file
@@ -0,0 +1,147 @@
|
||||
import random
|
||||
from collections import deque
|
||||
|
||||
import gymnasium as gym
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
|
||||
ACTION_SET = [
|
||||
np.zeros(17),
|
||||
np.full(17, -0.4),
|
||||
np.full(17, 0.4),
|
||||
np.concatenate([np.full(8, 0.4), np.full(9, -0.4)])
|
||||
]
|
||||
|
||||
class DQN(nn.Module):
|
||||
def __init__(self, n_states=348, n_actions=4):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(n_states, 64), nn.ReLU(),
|
||||
nn.Linear(64, n_actions)
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
"""
|
||||
Forward pass of the network.
|
||||
:param x: torch.Tensor of shape [n_states]
|
||||
:return: torch.Tensor of shape [n_actions] with Q-Values for each action
|
||||
"""
|
||||
return self.net(x)
|
||||
|
||||
|
||||
def train_and_save(weights_path="humanoid_dqn.pth", episodes=20_000, update_target_every=20):
|
||||
"""
|
||||
Train a DQN agent on the Humanoid-v5 environnement.
|
||||
:param weights_path: file path to save learned network weights
|
||||
:param episodes: number of training episodes (complete games)
|
||||
:param update_target_every: how many episodes to wait before syncing the target network
|
||||
:return: trained Q-Network ready to be used for inference
|
||||
"""
|
||||
|
||||
# environnement setup
|
||||
env = gym.make("Humanoid-v5")
|
||||
n_states, n_actions = env.observation_space.shape[0], len(ACTION_SET)
|
||||
|
||||
# les DQN
|
||||
policy_net = DQN(n_states, n_actions) # Q Network
|
||||
target_net = DQN(n_states, n_actions) # Target network
|
||||
target_net.load_state_dict(policy_net.state_dict()) # same weights at start
|
||||
target_net.eval()
|
||||
|
||||
# Optimizer et hyperparameters
|
||||
optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
|
||||
gamma = 0.99 # discount factor
|
||||
epsilon = 1.0 # Fréquence d'exploration initiale
|
||||
eps_min = 0.01 # Fréquence d'exploration minimale
|
||||
eps_decay = 0.9999 # Facteur de réduction d'epsilon
|
||||
memory = deque(maxlen=int(1e9))
|
||||
batch_size = 64
|
||||
|
||||
# main training loop
|
||||
for ep in range(episodes):
|
||||
# env.reset() returns a tuple (initial_state, info_dict)
|
||||
s, _ = env.reset()
|
||||
s = torch.tensor(s, dtype=torch.float32)
|
||||
done, total_r = False, 0
|
||||
|
||||
while not done:
|
||||
# epsilon-greedy à chaque prévision d'action pour une exploration plus fine (a = indice d'action, a_vecteur)
|
||||
if random.random() < epsilon:
|
||||
a = random.randrange(n_actions)
|
||||
else:
|
||||
a = torch.argmax(policy_net(s)).item()
|
||||
a_vector = ACTION_SET[a]
|
||||
# env.step(s) returns (next_state, reward, terminated, truncated, info)
|
||||
ns, r, done, _, _ = env.step(a_vector)
|
||||
ns = torch.tensor(ns, dtype=torch.float32)
|
||||
|
||||
memory.append((s, a ,r ,ns, done))
|
||||
s, total_r = ns, total_r + r
|
||||
|
||||
# learning phase
|
||||
if len(memory) >= batch_size:
|
||||
batch = random.sample(memory, batch_size)
|
||||
s_b, a_b, r_b, ns_b, d_b = zip(*batch)
|
||||
s_b = torch.stack(s_b)
|
||||
ns_b = torch.stack(ns_b)
|
||||
|
||||
# Q values for chosen actions
|
||||
q_pred = policy_net(s_b).gather(1, torch.tensor(a_b).unsqueeze(1)).squeeze()
|
||||
|
||||
# Target values using target network
|
||||
with torch.no_grad():
|
||||
q_next = target_net(ns_b).max(1)[0]
|
||||
q_target = torch.tensor(r_b, dtype=torch.float32) + \
|
||||
gamma * q_next * (1 - torch.tensor(d_b, dtype=torch.float32))
|
||||
|
||||
# MSE
|
||||
loss = ((q_pred - q_target)**2).mean()
|
||||
optimizer.zero_grad(); loss.backward(); optimizer.step()
|
||||
|
||||
# decay epsilon to gradually reduce exploration
|
||||
epsilon = max(eps_min, epsilon * eps_decay)
|
||||
|
||||
# Periodically synchronise target network with policy network
|
||||
if (ep + 1) % update_target_every == 0:
|
||||
target_net.load_state_dict(policy_net.state_dict())
|
||||
|
||||
if (ep + 1) % 20 == 0:
|
||||
print(f'Episode {ep + 1}: total reward {total_r:.1f}, epsilon {epsilon:.2f}')
|
||||
env.close()
|
||||
|
||||
# save trained policy network
|
||||
torch.save(policy_net.state_dict(), weights_path)
|
||||
print(f'Training finished. Weights saved to {weights_path}')
|
||||
return policy_net # <--- trained Q-network
|
||||
|
||||
|
||||
def show(weights_path="humanoid_dqn.pth") -> None:
|
||||
"""
|
||||
Load trained Q network and run a single episode to visually
|
||||
demonstrate the learned policy
|
||||
:param weights_path: path to the saved network weights
|
||||
:return:
|
||||
"""
|
||||
env = gym.make("Humanoid-v5", render_mode="human")
|
||||
qnet = DQN()
|
||||
qnet.load_state_dict(torch.load(weights_path))
|
||||
qnet.eval()
|
||||
|
||||
s, _ = env.reset()
|
||||
s = torch.tensor(s, dtype=torch.float32)
|
||||
done = False
|
||||
total_r = 0.0
|
||||
while not done:
|
||||
a = torch.argmax(qnet(s)).item()
|
||||
s_, r, done, _, _ = env.step(ACTION_SET[a])
|
||||
s = torch.tensor(s_, dtype=torch.float32)
|
||||
total_r += r
|
||||
env.close()
|
||||
print(f'Demonstration finished. Reward: {total_r:.2f}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trained_model = train_and_save()
|
||||
show()
|
||||
Reference in New Issue
Block a user