Skip to content
Snippets Groups Projects
Commit 920cb918 authored by Antonio Andriella's avatar Antonio Andriella
Browse files

extend plots and read_user_from_pickle funct

parent f914d93c
No related branches found
No related tags found
No related merge requests found
"""
Episodes representing expert demonstrations and automated generation
thereof.
"""
from Environment import Environment
import numpy as np
from itertools import chain
import itertools
import os
import time
import sys
class Episode:
"""
A episode consisting of states, corresponding actions, and outcomes.
Args:
transitions: The transitions of this episode as an array of
tuples `(state_from, action, state_to)`. Note that `state_to` of
an entry should always be equal to `state_from` of the next
entry.
"""
def __init__(self, states=[]):
self._t = list()
for s in states:
self._t.append(tuple(s))
def transition(self, state_from, action, state_to):
self._t.append((state_from, action, state_to))
def transitions(self):
"""
The transitions of this episode.
Returns:
All transitions in this episode as array of tuples
`(state_from, action, state_to)`.
"""
return self._t
def states(self):
"""
The states visited in this episode.
Returns:
All states visited in this episode as iterator in the order
they are visited. If a state is being visited multiple times,
the iterator will return the state multiple times according to
when it is visited.
"""
return map(lambda x: x[0], chain(self._t, [(self._t[-1][2], 0, 0)]))
def __repr__(self):
return "EpisodeGenerator({})".format(repr(self._t))
def __str__(self):
return "{}".format(self._t)
def generate_episode(world, policy, start, final):
"""
Generate a single episode.
Args:
world: The world for which the episode should be generated.
policy: A function (state: Integer) -> (action: Integer) mapping a
state to an action, specifying which action to take in which
state. This function may return different actions for multiple
invokations with the same state, i.e. it may make a
probabilistic decision and will be invoked anew every time a
(new or old) state is visited (again).
start: The starting state (as Integer index).
final: A collection of terminal states. If a episode reaches a
terminal state, generation is complete and the episode is
returned.
Returns:
A generated Episode instance adhering to the given arguments.
"""
state = start
episode = []
while state not in final:
action = policy(state)
next_s = range(world.n_states)
next_p = world.p_transition[state, :, action]
next_state = np.random.choice(next_s, p=next_p)
episode += [(state, action, next_state)]
state = next_state
return Episode(episode)
def policy_adapter(policy):
"""
A policy adapter for deterministic policies.
Adapts a deterministic policy given as array or map
`policy[state] -> action` for the episode-generation functions.
Args:
policy: The policy as map/array
`policy[state: Integer] -> action: Integer`
representing the policy function p(state).
Returns:
A function `(state: Integer) -> action: Integer` acting out the
given policy.
"""
return lambda state: policy[state]
def stochastic_policy_adapter(policy):
"""
A policy adapter for stochastic policies.
Adapts a stochastic policy given as array or map
`policy[state, action] -> probability` for the episode-generation
functions.
Args:
policy: The stochastic policy as map/array
`policy[state: Integer, action: Integer] -> probability`
representing the probability distribution p(action | state) of
an action given a state.
Returns:
A function `(state: Integer) -> action: Integer` acting out the
given policy, choosing an action randomly based on the distribution
defined by the given policy.
"""
return lambda state: np.random.choice([*range(policy.shape[1])], p=policy[state, :])
def get_states(states, initial_state):
states_list = list(itertools.product(*states))
states_list.insert(0, initial_state)
return states_list
def point_to_index(point, states):
return states.index(tuple(point))
def state_from_index_to_coord(state_tuple, index):
return state_tuple[index]
def load_episodes(file):
'''
It returns the episodes related to the saved file
:param file:
:param episode: look at main.py
:param sol_per_pop: look at main.py
:return: a list of episodes
'''
print("LOADING...")
trajs = list()
with open(file, "rb") as f:
traj = np.load(f, allow_pickle=True)
for t in range(len(traj)):
trajs.append(Episode(traj[t]))
print("loaded traj ", t)
f.close()
for t in trajs:
print(t._t)
return trajs
def generate_statistics(state_list, action_space, episodes):
'''
This function computes the state x state x action matrix that
corresponds to the transition table we will use later
'''
print(state_list)
n_states = len(state_list)
n_actions = len(action_space)
#create a matrix state x state x action
table = np.zeros(shape=(n_states, n_states, n_actions))
start_time = time.time()
s1, s2, a = range(n_states), range(n_states), range(n_actions)
for s_from in s1:
for act in a:
for s_to in s2:
#convert to coord
s_from_coord = state_from_index_to_coord(state_list, s_from)
s_to_coord = state_from_index_to_coord(state_list, s_to)
#print("from:", s_from_coord," to:", s_to_coord)
#print()
for traj in episodes:
if (s_from, act, s_to) in traj._t:
table[s_from, s_to, act] += 1
elapsed_time = time.time()-start_time
print("processing time:{}".format(elapsed_time))
return table
def compute_probabilities(transition_matrix, terminal_states):
"""
We compute the transitions for each state_from -> action -> state_to
:param transition_matrix: matrix that has shape n_states x n_states x action
:return:
"""
n_state_from, n_state_to, n_actions = transition_matrix.shape
transition_matrix_with_prob = np.zeros((n_state_from, n_state_to, n_actions))
for s_from in range(n_state_from):
s_in_prob = list()
sum_over_prob = 0
#get the episode from s_from to all the possible state_to given the 5 actions
#get all the occurrence on each column and compute the probabilities
#remember for each column the sum of probabilities has to be 1
for a in range(n_actions):
trans_state_from = list(zip(*transition_matrix[s_from]))[a]
#needs to be done to avoid nan (0/0)
sum_over_prob = sum(trans_state_from) if sum(trans_state_from)>0 else sys.float_info.min
s_in_prob.append(list(map(lambda x: x/sum_over_prob, trans_state_from)))
transition_matrix_with_prob[s_from][:][:] = np.asarray(s_in_prob).T
for state in terminal_states:
transition_matrix_with_prob[state][state][0] = 1
return transition_matrix_with_prob
def read_trans_matrix(file):
print("Loading trans matrix...")
fileinfo = os.stat(file)
trans_matrix = list()
with open(file, "rb") as f:
trans_matrix = np.load(f, allow_pickle=True)
#trans_matrix_reshaped = np.asarray(trans).reshape(n_states, n_states, n_actions)
print("Done")
return trans_matrix
def main():
file_path = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/episodes.npy"
episodes = load_episodes(file_path)
initial_state = (1, 1, 0)
n_max_attempt = 5
task_length = 6
# Environment setup for RL agent assistance
action_space = ['LEV_0', 'LEV_1', 'LEV_2', 'LEV_3', 'LEV_4', 'LEV_5']
user_actions_state = [-1, 0, 1]
final_states = [(task_length, a, u) for a in range(1, n_max_attempt) for u in range(-1, 2) ]
# defintion of state space
attempt = [i for i in range(1, n_max_attempt)]
game_state = [i for i in range(1, task_length+1)]
user_actions = [i for i in (user_actions_state)]
states_space = (game_state, attempt, user_actions) # , task_levels)
env = Environment(action_space, initial_state, final_states, user_actions, states_space,
task_length, n_max_attempt, timeout=0, n_levels_assistance=6)
#
trans_matrix = generate_statistics(env.states, env.action_space, episodes)
path_trans_matrix_occ = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_occ.npy"
path_trans_matrix_prob = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_prob.npy"
terminal_states = [env.point_to_index(state) for state in final_states]
# save the episode on a file
with open(path_trans_matrix_occ, "ab") as f:
np.save(f, trans_matrix)
f.close()
trans_matrix_occ = read_trans_matrix(path_trans_matrix_occ)
print(trans_matrix_occ.shape)
trans_matrix_prob = compute_probabilities(trans_matrix_occ, terminal_states)
# save the episode on a file
with open(path_trans_matrix_prob, "ab") as f:
np.save(f, trans_matrix_prob)
f.close()
#prob = read_trans_matrix(path_trans_matrix_prob, 0, 0)
if __name__ == "__main__":
main()
...@@ -5,13 +5,13 @@ import pickle ...@@ -5,13 +5,13 @@ import pickle
def plot2D_game_performance(save_path, n_episodes, *y): def plot2D_game_performance(save_path, n_episodes, *y):
# The position of the bars on the x-axis # The position of the bars on the x-axis
barWidth = 0.35 barWidth = 0.35
r = np.arange(n_episodes) # the x locations for the groups r = np.arange(n_episodes)[1::10] # the x locations for the groups
# Get values from the group and categories # Get values from the group and categories
x = [i for i in range(n_episodes)] x = [i for i in range(n_episodes)][1::10]
correct = list(map(lambda x:x[0], y[0])) correct = list(map(lambda x:x[0], y[0]))[1::10]
wrong = list(map(lambda x:x[1], y[0])) wrong = list(map(lambda x:x[1], y[0]))[1::10]
timeout = list(map(lambda x:x[2], y[0])) timeout = list(map(lambda x:x[2], y[0]))[1::10]
max_attempt = list(map(lambda x:x[3], y[0])) max_attempt = list(map(lambda x:x[3], y[0]))[1::10]
# plot bars # plot bars
plt.figure(figsize=(10, 7)) plt.figure(figsize=(10, 7))
...@@ -33,15 +33,15 @@ def plot2D_game_performance(save_path, n_episodes, *y): ...@@ -33,15 +33,15 @@ def plot2D_game_performance(save_path, n_episodes, *y):
def plot2D_assistance(save_path, n_episodes, *y): def plot2D_assistance(save_path, n_episodes, *y):
# The position of the bars on the x-axis # The position of the bars on the x-axis
barWidth = 0.35 barWidth = 0.35
r = np.arange(n_episodes) # the x locations for the groups r = np.arange(n_episodes)[1::10] # the x locations for the groups
# Get values from the group and categories # Get values from the group and categories
x = [i for i in range(n_episodes)] x = [i for i in range(n_episodes)][1::10]
lev_0 = list(map(lambda x:x[0], y[0])) lev_0 = list(map(lambda x:x[0], y[0]))[1::10]
lev_1 = list(map(lambda x:x[1], y[0])) lev_1 = list(map(lambda x:x[1], y[0]))[1::10]
lev_2 = list(map(lambda x:x[2], y[0])) lev_2 = list(map(lambda x:x[2], y[0]))[1::10]
lev_3 = list(map(lambda x:x[3], y[0])) lev_3 = list(map(lambda x:x[3], y[0]))[1::10]
lev_4 = list(map(lambda x:x[4], y[0])) lev_4 = list(map(lambda x:x[4], y[0]))[1::10]
# plot bars # plot bars
plt.figure(figsize=(10, 7)) plt.figure(figsize=(10, 7))
...@@ -65,12 +65,12 @@ def plot2D_assistance(save_path, n_episodes, *y): ...@@ -65,12 +65,12 @@ def plot2D_assistance(save_path, n_episodes, *y):
def plot2D_feedback(save_path, n_episodes, *y): def plot2D_feedback(save_path, n_episodes, *y):
# The position of the bars on the x-axis # The position of the bars on the x-axis
barWidth = 0.35 barWidth = 0.35
r = np.arange(n_episodes) # the x locations for the groups r = np.arange(n_episodes)[1::10] # the x locations for the groups
# Get values from the group and categories # Get values from the group and categories
x = [i for i in range(n_episodes)] x = [i for i in range(n_episodes)][1::10]
feedback_no = list(map(lambda x:x[0], y[0])) feedback_no = list(map(lambda x:x[0], y[0]))[1::10]
feedback_yes = list(map(lambda x:x[1], y[0])) feedback_yes = list(map(lambda x:x[1], y[0]))[1::10]
# plot bars # plot bars
plt.figure(figsize=(10, 7)) plt.figure(figsize=(10, 7))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment