Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
task_environment
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Antonio Andriella
task_environment
Commits
4dbd425f
Commit
4dbd425f
authored
4 years ago
by
Antonio Andriella
Browse files
Options
Downloads
Patches
Plain Diff
fix episode methods
parent
4135b611
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
episode.py
+212
-212
212 additions, 212 deletions
episode.py
with
212 additions
and
212 deletions
episode.py
+
212
−
212
View file @
4dbd425f
#
"""
"""
#
Episodes representing expert demonstrations and automated generation
Episodes representing expert demonstrations and automated generation
#
thereof.
thereof.
#
"""
"""
#
#
#
import numpy as np
import
numpy
as
np
#
from itertools import chain
from
itertools
import
chain
#
import itertools
import
itertools
#
import os
import
os
#
import time
import
time
#
import sys
import
sys
#
#
class Episode:
class
Episode
:
#
"""
"""
#
A episode consisting of states, corresponding actions, and outcomes.
A episode consisting of states, corresponding actions, and outcomes.
#
#
Args:
Args:
#
transitions: The transitions of this episode as an array of
transitions: The transitions of this episode as an array of
#
tuples `(state_from, action, state_to)`. Note that `state_to` of
tuples `(state_from, action, state_to)`. Note that `state_to` of
#
an entry should always be equal to `state_from` of the next
an entry should always be equal to `state_from` of the next
#
entry.
entry.
#
"""
"""
#
#
def __init__(self, states=[]):
def
__init__
(
self
,
states
=
[]):
#
self._t = list()
self
.
_t
=
list
()
#
for s in states:
for
s
in
states
:
#
self._t.append(tuple(s))
self
.
_t
.
append
(
tuple
(
s
))
#
#
def transition(self, state_from, action, state_to):
def
transition
(
self
,
state_from
,
action
,
state_to
):
#
self._t.append((state_from, action, state_to))
self
.
_t
.
append
((
state_from
,
action
,
state_to
))
#
#
def transitions(self):
def
transitions
(
self
):
#
"""
"""
#
The transitions of this episode.
The transitions of this episode.
#
#
Returns:
Returns:
#
All transitions in this episode as array of tuples
All transitions in this episode as array of tuples
#
`(state_from, action, state_to)`.
`(state_from, action, state_to)`.
#
"""
"""
#
return self._t
return
self
.
_t
#
#
def states(self):
def
states
(
self
):
#
"""
"""
#
The states visited in this episode.
The states visited in this episode.
#
#
Returns:
Returns:
#
All states visited in this episode as iterator in the order
All states visited in this episode as iterator in the order
#
they are visited. If a state is being visited multiple times,
they are visited. If a state is being visited multiple times,
#
the iterator will return the state multiple times according to
the iterator will return the state multiple times according to
#
when it is visited.
when it is visited.
#
"""
"""
#
return map(lambda x: x[0], chain(self._t, [(self._t[-1][2], 0, 0)]))
return
map
(
lambda
x
:
x
[
0
],
chain
(
self
.
_t
,
[(
self
.
_t
[
-
1
][
2
],
0
,
0
)]))
#
#
def __repr__(self):
def
__repr__
(
self
):
#
return "EpisodeGenerator({})".format(repr(self._t))
return
"
EpisodeGenerator({})
"
.
format
(
repr
(
self
.
_t
))
#
#
def __str__(self):
def
__str__
(
self
):
#
return "{}".format(self._t)
return
"
{}
"
.
format
(
self
.
_t
)
#
#
#
def get_states(self, states, initial_state):
def
get_states
(
self
,
states
,
initial_state
):
#
states_list = list(itertools.product(*states))
states_list
=
list
(
itertools
.
product
(
*
states
))
#
states_list.insert(0, initial_state)
states_list
.
insert
(
0
,
initial_state
)
#
return states_list
return
states_list
#
#
def state_from_point_to_index(self, states, point):
def
state_from_point_to_index
(
self
,
states
,
point
):
#
return states.index(tuple(point))
return
states
.
index
(
tuple
(
point
))
#
#
def state_from_index_to_point(self, state_tuple, index):
def
state_from_index_to_point
(
self
,
state_tuple
,
index
):
#
return state_tuple[index]
return
state_tuple
[
index
]
#
#
def load_episodes(self, file):
def
load_episodes
(
self
,
file
):
#
'''
'''
#
It returns the episodes related to the saved file
It returns the episodes related to the saved file
#
:param file:
:param file:
#
:param episode: look at main.py
:param episode: look at main.py
#
:param sol_per_pop: look at main.py
:param sol_per_pop: look at main.py
#
:return: a list of episodes
:return: a list of episodes
#
'''
'''
#
print("LOADING...")
print
(
"
LOADING...
"
)
#
#
trajs = list()
trajs
=
list
()
#
with open(file, "rb") as f:
with
open
(
file
,
"
rb
"
)
as
f
:
#
traj = np.load(f, allow_pickle=True)
traj
=
np
.
load
(
f
,
allow_pickle
=
True
)
#
for t in range(len(traj)):
for
t
in
range
(
len
(
traj
)):
#
trajs.append(Episode(traj[t]))
trajs
.
append
(
Episode
(
traj
[
t
]))
#
print("loaded traj ", t)
print
(
"
loaded traj
"
,
t
)
#
f.close()
f
.
close
()
#
for t in trajs:
for
t
in
trajs
:
#
print(t._t)
print
(
t
.
_t
)
#
return trajs
return
trajs
#
#
#
def generate_statistics(self, state_list, action_space, episodes):
def
generate_statistics
(
self
,
state_list
,
action_space
,
episodes
):
#
'''
'''
#
This function computes the state x state x action matrix that
This function computes the state x state x action matrix that
#
corresponds to the transition table we will use later
corresponds to the transition table we will use later
#
'''
'''
#
print(state_list)
print
(
state_list
)
#
n_states = len(state_list)
n_states
=
len
(
state_list
)
#
n_actions = len(action_space)
n_actions
=
len
(
action_space
)
#
#
#create a matrix state x state x action
#create a matrix state x state x action
#
table = np.zeros(shape=(n_states, n_states, n_actions))
table
=
np
.
zeros
(
shape
=
(
n_states
,
n_states
,
n_actions
))
#
start_time = time.time()
start_time
=
time
.
time
()
#
s1, s2, a = range(n_states), range(n_states), range(n_actions)
s1
,
s2
,
a
=
range
(
n_states
),
range
(
n_states
),
range
(
n_actions
)
#
for s_from in s1:
for
s_from
in
s1
:
#
for act in a:
for
act
in
a
:
#
for s_to in s2:
for
s_to
in
s2
:
#
#convert to coord
#convert to coord
#
s_from_coord = self.state_from_index_to_point(state_list, s_from)
s_from_coord
=
self
.
state_from_index_to_point
(
state_list
,
s_from
)
#
s_to_coord = self.state_from_index_to_point(state_list, s_to)
s_to_coord
=
self
.
state_from_index_to_point
(
state_list
,
s_to
)
#
#print("from:", s_from_coord," to:", s_to_coord)
#print("from:", s_from_coord," to:", s_to_coord)
#
#print()
#print()
#
for traj in episodes:
for
traj
in
episodes
:
#
if (s_from, act, s_to) in traj._t:
if
(
s_from
,
act
,
s_to
)
in
traj
.
_t
:
#
table[s_from, s_to, act] += 1
table
[
s_from
,
s_to
,
act
]
+=
1
#
elapsed_time = time.time()-start_time
elapsed_time
=
time
.
time
()
-
start_time
#
print("processing time:{}".format(elapsed_time))
print
(
"
processing time:{}
"
.
format
(
elapsed_time
))
#
return table
return
table
#
#
#
def compute_probabilities(self, transition_matrix, terminal_state, state_space):
def
compute_probabilities
(
self
,
transition_matrix
,
terminal_state
,
state_space
):
#
"""
"""
#
We compute the transitions for each state_from -> action -> state_to
We compute the transitions for each state_from -> action -> state_to
#
:param transition_matrix: matrix that has shape n_states x n_states x action
:param transition_matrix: matrix that has shape n_states x n_states x action
#
:return:
:return:
#
"""
"""
#
n_state_from, n_state_to, n_actions = transition_matrix.shape
n_state_from
,
n_state_to
,
n_actions
=
transition_matrix
.
shape
#
transition_matrix_with_prob = np.zeros((n_state_from, n_state_to, n_actions))
transition_matrix_with_prob
=
np
.
zeros
((
n_state_from
,
n_state_to
,
n_actions
))
#
#
for s_from in range(n_state_from):
for
s_from
in
range
(
n_state_from
):
#
s_in_prob = list()
s_in_prob
=
list
()
#
sum_over_prob = 0
sum_over_prob
=
0
#
#get the episode from s_from to all the possible state_to given the 5 actions
#get the episode from s_from to all the possible state_to given the 5 actions
#
#get all the occurrence on each column and compute the probabilities
#get all the occurrence on each column and compute the probabilities
#
#remember for each column the sum of probabilities has to be 1
#remember for each column the sum of probabilities has to be 1
#
for a in range(n_actions):
for
a
in
range
(
n_actions
):
#
trans_state_from = list(zip(*transition_matrix[s_from]))[a]
trans_state_from
=
list
(
zip
(
*
transition_matrix
[
s_from
]))[
a
]
#
#needs to be done to avoid nan (0/0)
#needs to be done to avoid nan (0/0)
#
#
sum_over_prob = sum(trans_state_from) if sum(trans_state_from)>0 else sys.float_info.min
sum_over_prob
=
sum
(
trans_state_from
)
if
sum
(
trans_state_from
)
>
0
else
sys
.
float_info
.
min
#
#
s_in_prob.append(list(map(lambda x: x/sum_over_prob, trans_state_from)))
s_in_prob
.
append
(
list
(
map
(
lambda
x
:
x
/
sum_over_prob
,
trans_state_from
)))
#
#
transition_matrix_with_prob[s_from][:][:] = np.asarray(s_in_prob).T
transition_matrix_with_prob
[
s_from
][:][:]
=
np
.
asarray
(
s_in_prob
).
T
#
#
for state in terminal_state:
for
state
in
terminal_state
:
#
state_idx = self.state_from_point_to_index(state_space, state)
state_idx
=
self
.
state_from_point_to_index
(
state_space
,
state
)
#
transition_matrix_with_prob[state_idx][state_idx][0] = 1
transition_matrix_with_prob
[
state_idx
][
state_idx
][
0
]
=
1
#
#
return transition_matrix_with_prob
return
transition_matrix_with_prob
#
#
#
def read_transition_matrix(self, file):
def
read_transition_matrix
(
self
,
file
):
#
print("Loading trans matrix...")
print
(
"
Loading trans matrix...
"
)
#
fileinfo = os.stat(file)
fileinfo
=
os
.
stat
(
file
)
#
trans_matrix = list()
trans_matrix
=
list
()
#
with open(file, "rb") as f:
with
open
(
file
,
"
rb
"
)
as
f
:
#
trans_matrix = np.load(f, allow_pickle=True)
trans_matrix
=
np
.
load
(
f
,
allow_pickle
=
True
)
#
#
#trans_matrix_reshaped = np.asarray(trans).reshape(n_states, n_states, n_actions)
#trans_matrix_reshaped = np.asarray(trans).reshape(n_states, n_states, n_actions)
#
print("Done")
print
(
"
Done
"
)
#
return trans_matrix
return
trans_matrix
#
#
#
def main():
def
main
():
#
pass
#
file_path = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/episodes.npy"
#
file_path = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/episodes.npy"
#
ep = Episode()
#
ep = Episode()
#
episodes = ep.load_episodes(file_path)
#
episodes = ep.load_episodes(file_path)
#
initial_state = (1, 1, 0)
#
initial_state = (1, 1, 0)
#
n_max_attempt = 5
#
n_max_attempt = 5
#
task_length = 6
#
task_length = 6
#
# Environment setup for RL agent assistance
#
# Environment setup for RL agent assistance
#
action_space = ['LEV_0', 'LEV_1', 'LEV_2', 'LEV_3', 'LEV_4', 'LEV_5']
#
action_space = ['LEV_0', 'LEV_1', 'LEV_2', 'LEV_3', 'LEV_4', 'LEV_5']
#
user_actions_state = [-1, 0, 1]
#
user_actions_state = [-1, 0, 1]
#
final_states = [(task_length, a, u) for a in range(1, n_max_attempt) for u in range(-1, 2) ]
#
final_states = [(task_length, a, u) for a in range(1, n_max_attempt) for u in range(-1, 2) ]
#
# defintion of state space
#
# defintion of state space
#
attempt = [i for i in range(1, n_max_attempt)]
#
attempt = [i for i in range(1, n_max_attempt)]
#
game_state = [i for i in range(1, task_length+1)]
#
game_state = [i for i in range(1, task_length+1)]
#
user_actions = [i for i in (user_actions_state)]
#
user_actions = [i for i in (user_actions_state)]
#
states_space = (game_state, attempt, user_actions) # , task_levels)
#
states_space = (game_state, attempt, user_actions) # , task_levels)
#
#
#
env = Environment(action_space, initial_state, final_states, user_actions, states_space,
#
env = Environment(action_space, initial_state, final_states, user_actions, states_space,
#
task_length, n_max_attempt, timeout=0, n_levels_assistance=6)
#
task_length, n_max_attempt, timeout=0, n_levels_assistance=6)
#
#
#
#
#
trans_matrix = ep.generate_statistics(env.states, env.action_space, episodes)
#
trans_matrix = ep.generate_statistics(env.states, env.action_space, episodes)
#
path_trans_matrix_occ = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_occ.npy"
#
path_trans_matrix_occ = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_occ.npy"
#
path_trans_matrix_prob = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_prob.npy"
#
path_trans_matrix_prob = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_prob.npy"
#
terminal_states = [env.point_to_index(state) for state in final_states]
#
terminal_states = [env.point_to_index(state) for state in final_states]
#
#
#
#
#
# save the episode on a file
#
# save the episode on a file
#
with open(path_trans_matrix_occ, "ab") as f:
#
with open(path_trans_matrix_occ, "ab") as f:
#
np.save(f, trans_matrix)
#
np.save(f, trans_matrix)
#
f.close()
#
f.close()
#
trans_matrix_occ = ep.read_trans_matrix(path_trans_matrix_occ)
#
trans_matrix_occ = ep.read_trans_matrix(path_trans_matrix_occ)
#
print(trans_matrix_occ.shape)
#
print(trans_matrix_occ.shape)
#
trans_matrix_prob = ep.compute_probabilities(trans_matrix_occ, terminal_states)
#
trans_matrix_prob = ep.compute_probabilities(trans_matrix_occ, terminal_states)
#
# save the episode on a file
#
# save the episode on a file
#
with open(path_trans_matrix_prob, "ab") as f:
#
with open(path_trans_matrix_prob, "ab") as f:
#
np.save(f, trans_matrix_prob)
#
np.save(f, trans_matrix_prob)
#
f.close()
#
f.close()
#
#
#
#prob = read_trans_matrix(path_trans_matrix_prob, 0, 0)
#
#prob = read_trans_matrix(path_trans_matrix_prob, 0, 0)
#
#
#
#
if __name__ == "__main__":
if
__name__
==
"
__main__
"
:
#
main()
main
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment