from __future__ import division
import os
import numpy as np
import tensorflow as tf
# Remove use of Keras backend
import keras.backend as K
from rl.utils.model import clone_model, get_soft_target_model_ops
from rl.utils.numerics import gradient_inverter, huber_loss
from rl.memory import Experience
from rl.agents.rlagent import RLAgent
from rl.utils.printer import print_warning
# Whether to use Keras inference engine
USE_KERAS_INFERENCE = False
[docs]class DDPGAgent(RLAgent):
"""
Deep Deterministic Policy Gradient Agent as defined in https://arxiv.org/abs/1509.02971.
:param keras.model actor: The actor network
:param keras.model critic: The critic network
:param gym.env env: The gym environment
:param memory: The memory object
:type memory: :class:`rl.memory.Memory`
:param float gamma: Discount factor
:param int batch_size: Size of the minibatches
:param int train_interval: Train only at multiples of this number
:param int memory_interval: Add experiences to memory only at multiples of this number
:param critic_gradient_clip: Delta to which the rewards are clipped (via Huber loss, see https://github.com/devsisters/DQN-tensorflow/issues/16)
:param random_process: The noise used to perform exploration
:param custom_model_objects:
:param float target_critic_update: Target critic update factor
:param float target_actor_update: Target actor update factor
:param bool invert_gradients: Use gradient inverting as defined in https://arxiv.org/abs/1511.04143
"""
def __init__(self,
actor,
critic,
memory,
gamma=.99,
batch_size=32,
train_interval=1,
memory_interval=1,
critic_gradient_clip=100,
random_process=None,
custom_model_objects=None,
warmup_actor_steps=200,
warmup_critic_steps=200,
invert_gradients=False,
gradient_inverter_min=-1.,
gradient_inverter_max=1.,
actor_reset_threshold=0.3,
reset_controlers=False,
actor_learning_rate=1e-3,
critic_learning_rate=1e-4,
target_critic_update=0.01,
target_actor_update=0.01,
critic_regularization=0.01,
**kwargs):
if custom_model_objects is None:
custom_model_objects = {}
if hasattr(actor.output, '__len__') and len(actor.output) > 1:
raise ValueError(
'Actor "{}" has more than one output. DDPG expects an actor that has a single output.'.
format(actor))
if hasattr(critic.output, '__len__') and len(critic.output) > 1:
raise ValueError(
'Critic "{}" has more than one output. DDPG expects a critic that has a single output.'.
format(critic))
if not hasattr(critic.input, '__len__') or len(critic.input) < 2:
raise ValueError(
'Critic "{}" does not have enough inputs. The critic must have at exactly two inputs, one for the action and one for the observation.'.
format(critic))
super(DDPGAgent, self).__init__(name="ddpg", **kwargs)
# Get placeholders
self.variables["state"] = self.env.state
self.variables["action"] = self.env.action
# Parameters.
self.nb_actions = self.env.action_space.dim
self.actions_low = self.env.action_space.low
self.actions_high = self.env.action_space.high
self.random_process = random_process
self.critic_gradient_clip = critic_gradient_clip
self.gamma = gamma
self.warmup_actor_steps = warmup_actor_steps
self.warmup_critic_steps = warmup_critic_steps
self.critic_learning_rate = critic_learning_rate
self.actor_learning_rate = actor_learning_rate
self.critic_regularization = critic_regularization
(self.target_critic_update, self.target_critic_hard_updates
) = process_hard_update_variable(target_critic_update)
(self.target_actor_update, self.target_actor_hard_updates
) = process_hard_update_variable(target_actor_update)
self.batch_size = batch_size
self.train_interval = train_interval
self.memory_interval = memory_interval
self.custom_model_objects = custom_model_objects
self.invert_gradients = invert_gradients
if invert_gradients:
self.gradient_inverter_max = gradient_inverter_max
self.gradient_inverter_min = gradient_inverter_min
self.actor_reset_threshold = actor_reset_threshold
self.reset_controlers = reset_controlers
# Related objects.
self.actor = actor
self.critic = critic
self.memory = memory
# State.
self.compiled = False
self.reset_states()
@property
def uses_learning_phase(self):
return self.actor.uses_learning_phase or self.critic.uses_learning_phase
[docs] def load_memory(self, memory):
"""Loads the given memory as the replay buffer"""
del (self.memory)
self.memory = memory
def compile(self):
# Compile target networks. We only use them in feed-forward mode, hence we can pass any
# optimizer and loss since we never use it anyway.
self.target_actor = clone_model(self.actor, self.custom_model_objects)
self.target_actor.compile(optimizer='sgd', loss='mse')
self.target_critic = clone_model(self.critic,
self.custom_model_objects)
self.target_critic.compile(optimizer='sgd', loss='mse')
self.compile_actor()
self.compile_critic()
# Collect summaries directly from variables
for (var_name, variable) in self.variables.items():
self.summary_variables[var_name] = (tf.summary.scalar(
var_name, variable))
# Special selections of summary variables
# Critic
self.critic_summaries = [
value for (key, value) in self.summary_variables.items()
if (key.startswith("critic/") or key.startswith("target_critic/"))
]
# Critic post (run after training)
self.critic_summaries_post = [
value for (key, value) in self.summary_variables.items()
if (key.startswith("critic_post/")
or key.startswith("target_critic_post/"))
]
# Actor
# No need to collect the actor's loss, since we already have actor/objective
self.actor_summaries = [
value for (key, value) in self.summary_variables.items()
if (key.startswith("actor/") and not key == ("actor/loss")
or key.startswith("target_actor/"))
]
# Actor post
self.actor_summaries_post = [
value for (key, value) in self.summary_variables.items()
if (key.startswith("actor_post/")
or key.startswith("target_actor_post/"))
]
# Initialize the remaining variables
# FIXME: Use directly Keras backend
# This is a kind of a hack
# Taken from the "initialize_variables" of the Keras Tensorflow backend
# https://github.com/fchollet/keras/blob/master/keras/backend/tensorflow_backend.py#L330
# It permits to only initialize variables that are not already initialized
# Without that, the networks and target networks get initialized again, to different values (stochastic initialization)
# This is a problem when a network and it's target network do not begin with the same parameter values...
variables = tf.global_variables()
uninitialized_variables = []
for v in variables:
if not hasattr(v,
'_keras_initialized') or not v._keras_initialized:
uninitialized_variables.append(v)
v._keras_initialized = True
self.session.run(tf.variables_initializer(uninitialized_variables))
# self.session.run(tf.global_variables_initializer())
# Save the initial values of the networks
self.checkpoint()
self.compiled = True
def compile_actor(self):
# We also compile the actor. We never optimize the actor using Keras but instead compute
# the policy gradient ourselves. However, we need the actor in feed-forward mode, hence
# we also compile it with any optimizer
self.actor.compile(optimizer='sgd', loss='mse')
# Target actor optimizer
if not self.target_actor_hard_updates:
# Include soft target model updates.
self.target_actor_train_op = get_soft_target_model_ops(
self.target_actor.weights, self.actor.weights,
self.target_actor_update)
# Actor optimizer
actor_optimizer = tf.train.AdamOptimizer(
learning_rate=self.actor_learning_rate)
# Be careful to negate the gradient
# Since the optimizer wants to minimize the value
self.variables["actor/loss"] = -tf.reduce_mean(
self.critic(
[self.variables["state"],
self.actor(self.variables["state"])]))
self.variables["actor/objective"] = -self.variables["actor/loss"]
actor_gradient_vars = actor_optimizer.compute_gradients(
self.variables["actor/loss"],
var_list=self.actor.trainable_weights)
# Gradient inverting
# as described in https://arxiv.org/abs/1511.04143
if self.invert_gradients:
actor_gradient_vars = [(gradient_inverter(
x[0], self.gradient_inverter_min, self.gradient_inverter_max),
x[1]) for x in actor_gradient_vars]
# Compute the norm of each weights's gradient
actor_gradients_norms = [
tf.norm(grad_var[0]) for grad_var in actor_gradient_vars
]
for var, norm in zip(self.actor.trainable_weights,
actor_gradients_norms):
var_name = "actor/{}/gradient_norm".format(var.name)
self.variables[var_name] = (norm)
# As long as the sum
self.variables["actor/gradient_norm"] = tf.reduce_sum(
actor_gradients_norms)
# The actual train op
self.actor_train_op = actor_optimizer.apply_gradients(
actor_gradient_vars)
# Additional actor metrics
actor_norms = [
tf.norm(weight) for weight in self.actor.trainable_weights
]
for var, norm in zip(self.actor.trainable_weights, actor_norms):
var_name = "actor/{}/norm".format(var.name)
self.variables[var_name] = norm
self.variables["actor/norm"] = tf.reduce_sum(actor_norms)
# Additional target actor metrics
target_actor_norms = [
tf.norm(weight) for weight in self.target_actor.trainable_weights
]
for var, norm in zip(self.target_critic.trainable_weights,
target_actor_norms):
var_name = "target_actor/{}/norm".format(var.name)
self.variables[var_name] = norm
self.variables["target_actor/norm"] = tf.reduce_sum(target_actor_norms)
def compile_critic(self):
# Compile the critic for the same reason
self.critic.compile(optimizer='sgd', loss='mse')
# Compile the critic optimizer
critic_optimizer = tf.train.AdamOptimizer(
learning_rate=self.critic_learning_rate)
# NOT to be mistaken with the target_critic!
self.critic_target = tf.placeholder(dtype=tf.float32, shape=(None, 1))
# Clip the critic gradient using the huber loss
self.variables["critic/loss"] = K.mean(
huber_loss(
self.critic(
[self.variables["state"], self.variables["action"]]),
self.critic_target, self.critic_gradient_clip))
# L2 regularization on the critic loss
critic_norms = [
tf.norm(weight) for weight in self.critic.trainable_weights
]
critic_norms_l2 = [
tf.nn.l2_loss(weight) for weight in self.critic.trainable_weights
]
self.variables["critic/norm"] = tf.reduce_sum(critic_norms)
self.variables["critic/l2_norm"] = tf.reduce_sum(critic_norms_l2)
if self.critic_regularization != 0:
self.variables[
"critic/loss"] += self.critic_regularization * self.variables[
"critic/l2_norm"]
# Compute gradients
critic_gradient_vars = critic_optimizer.compute_gradients(
self.variables["critic/loss"],
var_list=self.critic.trainable_weights)
# Compute the norm as a metric
critic_gradients_norms = [
tf.norm(grad_var[0]) for grad_var in critic_gradient_vars
]
for var, norm in zip(self.critic.trainable_weights,
critic_gradients_norms):
var_name = "critic/{}/gradient_norm".format(var.name)
self.variables[var_name] = norm
self.variables["critic/gradient_norm"] = tf.reduce_sum(
critic_gradients_norms)
self.critic_train_op = critic_optimizer.apply_gradients(
critic_gradient_vars)
# Additional critic metrics
for var, norm in zip(self.critic.trainable_weights, critic_norms):
var_name = "critic/{}/norm".format(var.name)
self.variables[var_name] = norm
# Additional target critic metrics
target_critic_norms = [
tf.norm(weight) for weight in self.target_critic.trainable_weights
]
for var, norm in zip(self.target_critic.trainable_weights,
target_critic_norms):
var_name = "target_critic/{}/norm".format(var.name)
self.variables[var_name] = norm
self.variables["target_critic/norm"] = tf.reduce_sum(
target_critic_norms)
# Target critic optimizer
if not self.target_critic_hard_updates:
# Include soft target model updates.
self.target_critic_train_op = get_soft_target_model_ops(
self.target_critic.weights, self.critic.weights,
self.target_critic_update)
def load_weights(self, filepath):
filename, extension = os.path.splitext(filepath)
actor_filepath = filename + '_actor' + extension
critic_filepath = filename + '_critic' + extension
self.actor.load_weights(actor_filepath)
self.critic.load_weights(critic_filepath)
self.hard_update_target_models()
def save_weights(self, filepath, overwrite=False):
print("Saving weights")
filename, extension = os.path.splitext(filepath)
actor_filepath = filename + '_actor' + extension
critic_filepath = filename + '_critic' + extension
self.actor.save_weights(actor_filepath, overwrite=overwrite)
self.critic.save_weights(critic_filepath, overwrite=overwrite)
[docs] def save(self, name="DDPG"):
"""Save the model as an HDF5 file"""
self.actor.save(name + "_actor.h5")
self.critic.save(name + "_critic.h5")
def hard_update_target_critic(self):
print("Hard update of the target critic")
self.target_critic.set_weights(self.critic.get_weights())
def hard_update_target_actor(self):
print("Hard update of the target actor")
self.target_actor.set_weights(self.actor.get_weights())
def reset_states(self):
if self.random_process is not None:
self.random_process.reset_states()
if self.compiled:
self.actor.reset_states()
self.critic.reset_states()
self.target_actor.reset_states()
self.target_critic.reset_states()
def forward(self, observation):
# Select an action.
# [state] is the unprocessed version of a batch
batch_state = [observation]
# We get a batch of 1 action
# action = self.actor.predict_on_batch(batch_state)[0]
action = self.session.run(
self.actor(self.variables["state"]),
feed_dict={
self.variables["state"]: batch_state,
K.learning_phase(): 0
})[0]
assert action.shape == (self.nb_actions, )
# Apply noise, if a random process is set.
if self.exploration and self.random_process is not None:
noise = self.random_process.sample()
assert noise.shape == action.shape
action += noise
# Clip the action value, even if the noise is making it exceed its bounds
action = np.clip(action, self.actions_low, self.actions_high)
return action
return (action)
[docs] def backward(self):
"""
Backward method of the DDPG agent
"""
# Stop here if not training
if not self.training:
return
# Store most recent experience in memory.
if self.training_step % self.memory_interval == 0:
self.memory.append(
Experience(self.observation, self.action,
self.reward, self.observation_1, self.done))
# Train the networks
if self.training_step % self.train_interval == 0:
# If warm up is over:
# Update critic
self.warmingup_critic = (self.training_step <= self.warmup_critic_steps)
train_critic = (not self.warmingup_critic)
# Update actor
self.warmingup_actor = self.training_step <= self.warmup_actor_steps
train_actor = (not self.warmingup_actor)
self._backward(train_actor=train_actor, train_critic=train_critic)
[docs] def backward_offline(self, train_actor=True, train_critic=True):
"""
Offline Backward method of the DDPG agent
:param bool offline: Add the new experiences to memory
:param bool train_actor: Activate of Deactivate training of the actor
:param bool train_critic: Activate of Deactivate training of the critic
"""
# Stop here if not training
if not self.training:
return
self._backward(train_actor=train_actor, train_critic=train_critic)
def _backward(self, train_actor=True, train_critic=True):
"""
Offline Backward method of the DDPG agent
:param bool offline: Add the new experiences to memory
:param bool train_actor: Activate of Deactivate training of the actor
:param bool train_critic: Activate of Deactivate training of the critic
"""
# Stop here if not training
if not self.training:
return
# Train the networks
if self.training_step % self.train_interval == 0:
# Hard update the target nets if necessary
if self.target_actor_hard_updates:
hard_update_target_actor = self.training_step % self.target_actor_update == 0
else:
hard_update_target_actor = False
if self.target_critic_hard_updates:
hard_update_target_critic = self.training_step % self.target_critic_update == 0
else:
hard_update_target_critic = False
# Whether to reset the actor
if self.done and (self.episode % 5 == 0) and self.reset_controlers:
can_reset_actor = True
else:
can_reset_actor = False
if (train_actor or train_critic):
self.train_controllers(
train_critic=train_critic,
train_actor=train_actor,
can_reset_actor=can_reset_actor,
hard_update_target_critic=hard_update_target_critic,
hard_update_target_actor=hard_update_target_actor)
[docs] def train_controllers(self,
train_critic=True,
train_actor=True,
can_reset_actor=False,
hard_update_target_critic=False,
hard_update_target_actor=False):
"""
Fit the actor and critic networks
:param bool train_critic: Whether to fit the critic
:param bool train_actor: Whether to fit the actor
:param bool can_reset_actor:
"""
if not (train_actor or train_critic):
return
else:
batch = self.memory.sample(self.batch_size)
summaries = []
summaries_post = []
# Train networks
if train_critic:
summaries_critic, summaries_post_critic = self.train_critic(
batch)
summaries += summaries_critic
summaries_post += summaries_post_critic
if train_actor:
summaries_actor, summaries_post_actor = self.train_actor(
batch, can_reset_actor=can_reset_actor)
summaries += summaries_actor
summaries_post += summaries_post_actor
# Update target networks
if hard_update_target_actor:
self.hard_update_target_actor()
else:
self.session.run(self.target_actor_train_op)
if hard_update_target_critic:
self.hard_update_target_critic()
else:
self.session.run(self.target_critic_train_op)
self.step_summaries += summaries
self.step_summaries_post += summaries_post
[docs] def train_critic(self, batch, sgd_iterations=1):
"""Fit the critic network"""
# Get the target action
# \pi(s_{t + 1})
if USE_KERAS_INFERENCE:
target_actions = self.target_actor.predict_on_batch(batch.state1)
else:
target_actions = self.session.run(
self.target_actor(self.variables["state"]),
feed_dict={
self.variables["state"]: batch.state1,
K.learning_phase(): 0
})
assert target_actions.shape == (self.batch_size, self.nb_actions)
# Get the target Q value of the next state
# Q(s_{t + 1}, \pi(s_{t + 1}))
if USE_KERAS_INFERENCE:
target_q_values = self.target_critic.predict_on_batch(
[batch.state1, target_actions])
else:
target_q_values = self.session.run(
self.target_critic(
[self.variables["state"], self.variables["action"]]),
feed_dict={
self.variables["state"]: batch.state1,
self.variables["action"]: target_actions
})
# Also works
# assert target_q_values.shape == (self.batch_size, )
# Full the critic targets:
# r_t + gamma * Q(s_{t + 1}, \pi(s_{t + 1}))
discounted_reward_batch = self.gamma * target_q_values
critic_targets = (batch.reward + discounted_reward_batch)
feed_dict = {
self.variables["state"]: batch.state0,
self.variables["action"]: batch.action,
self.critic_target: critic_targets
}
# Collect summaries and metrics before training the critic
self.metrics["critic/gradient_norm"], summaries = self.session.run(
[self.variables["critic/gradient_norm"], self.critic_summaries], feed_dict=feed_dict)
# Train the critic
for _ in range(sgd_iterations):
# FIXME: The intermediate gradient values are not captured
self.session.run(self.critic_train_op, feed_dict=feed_dict)
# Collect summaries and metrics after training the critic
summaries_post = self.session.run(
self.critic_summaries_post, feed_dict=feed_dict)
return (summaries, summaries_post)
[docs] def train_actor(self, batch, sgd_iterations=1, can_reset_actor=False):
"""Fit the actor network"""
feed_dict = {
self.variables["state"]: batch.state0,
K.learning_phase(): 1
}
# Collect metrics before training the actor
self.metrics["actor/gradient_norm"], summaries = self.session.run(
[self.variables["actor/gradient_norm"], self.actor_summaries],
feed_dict=feed_dict)
# Train the actor
for _ in range(sgd_iterations):
# FIXME: The intermediate gradient values are not captured
self.session.run(self.actor_train_op, feed_dict=feed_dict)
# Collect metrics before training the actor
summaries_post = self.session.run(
self.actor_summaries_post, feed_dict=feed_dict)
if can_reset_actor:
# Reset the actor if the gradient is flat
if self.metrics["actor/gradient_norm"] <= self.actor_reset_threshold:
# TODO: Use a gradient on a rolling window: multiple steps (and even multiple episodes)
self.restore_checkpoint(actor=True, critic=False)
return (summaries, summaries_post)
[docs] def checkpoint(self):
"""Save the weights"""
self.checkpoints.append((self.actor.get_weights(),
self.critic.get_weights()))
[docs] def restore_checkpoint(self, actor=True, critic=True, checkpoint_id=0):
"""Restore from checkpoint"""
weights_actor, weights_critic = self.checkpoints[checkpoint_id]
if actor:
print_warning("Restoring actor and target actor")
self.actor.set_weights(weights_actor)
self.target_actor.set_weights(weights_actor)
if critic:
print_warning("Restoring critic")
self.critic.set_weights(weights_critic)
self.target_critic.set_weights(weights_critic)
def process_hard_update_variable(param):
# Soft vs hard target model updates.
if param < 0:
raise ValueError(
'`target_model_update` must be >= 0, currently at {}'.format(
param))
elif param >= 1:
# Hard update every `target_model_update` steps.
param = int(param)
hard_updates = True
else:
# Soft update with `(1 - target_model_update) * old + target_model_update * new`.
param = float(param)
hard_updates = False
return (param, hard_updates)