diff --git a/cognitive_game_env.py b/cognitive_game_env.py new file mode 100644 index 0000000000000000000000000000000000000000..c40d882202c0d4a60cea61c5cdda0de5c89996b6 --- /dev/null +++ b/cognitive_game_env.py @@ -0,0 +1,357 @@ +""" +Cognitive-Game Markov Decision Processes (MDPs). + +Some general remarks: + - The state of the agent is defined by game_progress, number of attempt and + whether or not the previous move of the user was successfully + + - Any action can be taken in any state and have a unique inteded outcome. + The result of an action is stochastic, but there is always exactly one that can be described + as the intended result of the action. +""" + +import numpy as np +import itertools +import random +from episode import Episode + + +class CognitiveGame: + """ + MDP formalisation of the cognitive game + """ + + def __init__(self, initial_state, terminal_state, task_length, + n_max_attempt, action_space, state_space, + user_action, timeout, episode_list): + self.n_states = (task_length+1)*n_max_attempt* len(user_action) + self.task_length = task_length + self.n_max_attempt = n_max_attempt + self.action_space = action_space + self.state_tuple = state_space + self.user_action = user_action + self.timeout = timeout + self.state_tuple = state_space + self.state_tuple_indexed = self.get_states_index(self.state_tuple) + self.initial_state_indexed = self.state_point_to_index(initial_state) + self.terminal_states_indexed = [self.state_point_to_index(state=final_state) for final_state in (terminal_state)] + self.rewards = self.initialise_rewards + self.episode_instance = Episode() + self.p_transition = self.gen_transition_matrix(episode_list, state_space, action_space, terminal_state) + + def initialise_rewards(self): + rewards = np.zeros((len(self.task_complexity), self.n_max_attempt, len(self.user_actions_state))) + for t in range(len(self.task_complexity)): + for a in range(len(self.attempt)): + for u in range(len(self.user_actions_state)): + if (t, a, u) in self.goal_states: + rewards[t, a, u] = 1 + + + def get_states_index(self, states_space): + states_index_list = [self.state_point_to_index(s) for s in (states_space)] + return states_index_list + + + def state_index_to_point(self, index): + """ + Convert a state index to the coordinate representing it. + + Args: + state: Integer representing the state. + + Returns: + The coordinate as tuple of integers representing the same state + as the index. + """ + + return self.state_tuple[index] + + def state_point_to_index(self, state): + """ + Convert a state coordinate to the index representing it. + + Note: + Does not check if coordinates lie outside of the world. + + Args: + state: Tuple of integers representing the state. + + Returns: + The index as integer representing the same state as the given + coordinate. + """ + return self.state_tuple.index(tuple(state)) + + def generate_states(self, user_progress, user_attempt, user_action): + + self.state_tuple = list(itertools.product(user_progress, user_attempt, user_action)) + return self.state_tuple + + + + def state_index_transition(self, s, a): + """ + Perform action `a` at state `s` and return the intended next state. + + Does not take into account the transition probabilities. Instead it + just returns the intended outcome of the given action taken at the + given state, i.e. the outcome in case the action succeeds. + + Args: + s: The state at which the action should be taken. + a: The action that should be taken. + + Returns: + The next state as implied by the given action and state. + """ + + #get probability for a given state + prob_next_states = self.p_transition[s, :, a] + + + s_point = self.state_index_to_point(s) + s_next = 0 + rand_prob = random.random() + + if s in self.final_states: + return s + + s_next = np.argmax(prob_next_states) + + + + #s = s[0] + self.actions[a][0], s[1] + self.actions[a][1] + return (s_next) + + def gen_transition_matrix(self, episodes, state_space, action_space, terminal_state): + ''' + This function generate the transition matrix function + Args: + :episodes_path: + :n_episode: + :n_sol_per_pos: + :environment: + :path_trans_matrix_occ: + :param path_trans_matrix_prob: + Return: + ''' + trans_matrix_occ = self.episode_instance.generate_statistics(state_space, action_space, episodes) + # save the episode on a file + # with open(path_trans_matrix_occ, "ab") as f: + # np.save(f, trans_matrix) + # f.close() + #trans_matrix_occ = Episode.read_trans_matrix(path_trans_matrix_occ) + trans_matrix_prob = self.episode_instance.compute_probabilities(trans_matrix_occ, terminal_state, state_space) + # save the episode on a file + # with open(path_trans_matrix_prob, "ab") as f: + # np.save(f, trans_matrix_prob) + # f.close() + return trans_matrix_prob + + + def _transition_prob_table(self): + """ + Builds the internal probability transition table. + + Returns: + The probability transition table of the form + + [state_from, state_to, action] + + containing all transition probabilities. The individual + transition probabilities are defined by `self._transition_prob'. + """ + table = np.zeros(shape=(self.n_states, self.n_states, self.n_actions)) + + s1, s2, a = range(self.n_states), range(self.n_states), range(self.n_actions) + for s_from in s1: + for act in a: + for s_to in s2: + table[s_from, s_to, act] = self._transition_prob(s_from, s_to, act, 0) + + for state in self.terminal_states_indexed: + table[state][state][0] = 1 + #for s_from, s_to, a in itertools.product(s1, s2, a): + # table[s_from, s_to, a] = self._transition_prob(s_from, s_to, a, 0) + + + return table + + + def load_transition_prob(self, file, terminal_states): + """ + Load the transition matrix from a file + Args: + file: The npy file where the transition prob has been saved + Returns: + the transition probabily matrix + """ + print("Loading file ...") + table = np.zeros(shape=(self.n_states, self.n_states, self.n_actions)) + with open(file, "rb") as f: + table = np.load(file, allow_pickle="True") + + s1, s2, a = range(self.n_states), range(self.n_states), range(self.n_actions) + for s_from in s1: + for act in a: + for s_to in s2: + if np.isnan(table[s_from, s_to, act]): + table[s_from, s_to, act] = 0 + + for state in terminal_states: + point_to_index = self.state_point_to_index(state) + table[point_to_index][point_to_index][0] = 1 + + return table + + + def _transition_prob(self, s_from, s_to, a, value): + """ + Compute the transition probability for a single transition. + + Args: + s_from: The state in which the transition originates. + s_to: The target-state of the transition. + a: The action via which the target state should be reached. + + Returns: + The transition probability from `s_from` to `s_to` when taking + action `a`. + """ + fx, fy = self.state_index_to_point(s_from) + tx, ty = self.state_index_to_point(s_to) + lev = (self.actions[a]) + index_lev = self.actions.index(lev) + states_actions = 3*[0] + + max_attempt_states = [(i, self.n_attempt) for i in range(1, self.n_solution+1)] + + + if (fx, fy) in max_attempt_states: + next_states = [(fx + 1, 1)] + elif s_from in self.final_states: + next_states = [(fx, fy + 1), (fx, fy)] + else: + next_states = [(fx + 1, 1), (fx, fy + 1), (fx, fy)] + + + + if (fx, fy) in max_attempt_states and s_from in self.final_states: + return 0.0 + + elif self.state_index_to_point(s_to) not in next_states: + return 0.0 + + elif (fx, fy) in max_attempt_states and (tx==fx+1 and ty == fy): + return 1.0 + + print("prev_state:", tx,ty) + + sum = 0 + prob = list() + actions = [0.1, 0.3, 0.5, 0.8, 10] + game_timeline = [1, 1.2, 1.5, 2, 2.5] + attempt_timeline = [1, 1.2, 1.5, 2] + sum_over_actions = 0 + for next_state in next_states: + prob.append(actions[a] * game_timeline[next_state[0]-1] * attempt_timeline[next_state[1]-1]) + sum_over_actions += actions[a] * game_timeline[next_state[0]-1] * attempt_timeline[next_state[1]-1] + norm_prob = list(map(lambda x: x / sum_over_actions, prob)) + i = 0 + for ns in next_states: + states_actions[i] = (norm_prob[i]) + i += 1 + + if len(next_states) == 3: + if tx ==fx + 1 and ty== 1: + return states_actions[0] + elif tx == fx and ty == fy + 1: + return states_actions[1] + elif tx == fx and ty == fy: + return states_actions[2] + else: + return 0 + elif len(next_states) == 2: + if tx == fx and ty == fy + 1: + return states_actions[0] + elif tx == fx and ty == fy: + return states_actions[1] + else: + return 0 + else: + return 1.0 + + + + def state_features(self): + """ + Return the feature matrix assigning each state with an individual + feature (i.e. an identity matrix of size n_states * n_states). + + Rows represent individual states, columns the feature entries. + + Args: + world: A GridWorld instance for which the feature-matrix should be + computed. + + Returns: + The coordinate-feature-matrix for the specified world. + """ + return np.identity(self.n_states) + + def assistive_feature(self, trajs): + """ + Generate a Nx3 feature map for gridword 1:distance from start state, distance from goal, react_time + :param gw: GridWord + :param trajs: generated by the expert + :return: Nx3 feature map + """ + max_attempt = self.n_solution*self.n_attempt + max_time = self.timeout*self.n_solution*self.n_attempt + N = self.n_solution * self.n_attempt * self.n_user_action + + + feat = np.ones([N, 4]) + feat_trajs = np.zeros([N, 4]) + t = 0 + for traj in trajs: + for s1, a, s2 in traj._t: + ix, iy, iz = self.state_index_to_point(s1) + feat[s1, 0] = 0 + feat[s1, 1] = 0#max_attempt*ix + feat[s1, 2] = ix + feat[s1, 3] = iy + + # for trans in traj: + # if trans[0] == i: + # current_react_time = trans[2] + # feat[i, 2] = current_react_time + feat_trajs += (feat) + t += 1 + return feat_trajs / len(trajs) + + def coordinate_features(self, world): + """ + Symmetric features assigning each state a vector where the respective + coordinate indices are nonzero (i.e. a matrix of size n_states * + world_size). + + Rows represent individual states, columns the feature entries. + + Args: + world: A GridWorld instance for which the feature-matrix should be + computed. + + Returns: + The coordinate-feature-matrix for the specified world. + """ + features = np.zeros((world.n_states, world.n_solution)) + + for s in range(world.n_states): + x, y, _= world.state_index_to_point(s) + features[s, x-1] += 1 + features[s, y-1] += 1 + + return features + diff --git a/cognitive_game_vars.py b/cognitive_game_vars.py new file mode 100644 index 0000000000000000000000000000000000000000..3571ef6dcd2accbedbd8fb00851c12d4e4b54a40 --- /dev/null +++ b/cognitive_game_vars.py @@ -0,0 +1,81 @@ +'''this module collect all the variables involved in the bayesian network and initialise them''' +import enum + +class User_React_time(enum.Enum): + slow = 0 + normal = 1 + fast = 1 + name = "user_react_time" + counter = 3 + +class User_Capability(enum.Enum): + very_mild = 0 + mild = 1 + severe = 2 + name = "user_capability" + counter = 3 + +class User_Action(enum.Enum): + correct = 0 + wrong = 1 + timeout = 2 + name = "user_action" + counter = 3 + +class Reactivity(enum.Enum): + slow = 0 + medium = 1 + fast = 2 + name = "reactivity" + counter = 3 + +class Memory(enum.Enum): + low = 0 + medium = 1 + high = 2 + name = "memory" + counter = 3 + +class Attention(enum.Enum): + low = 0 + medium = 1 + high = 2 + name = "attention" + counter = 3 + +class Robot_Assistance(enum.Enum): + lev_0 = 0 + lev_1 = 1 + lev_2 = 2 + lev_3 = 3 + lev_4 = 4 + lev_5 = 5 + name = "robot_assistance" + counter = 6 + +class Robot_Feedback(enum.Enum): + yes = 1 + no = 0 + name = "robot_feedback" + counter = 2 + +class Game_State(enum.Enum): + beg = 0 + middle = 1 + end = 2 + name = "game_state" + counter = 3 + +class Attempt(enum.Enum): + at_1 = 0 + at_2 = 1 + at_3 = 2 + at_4 = 3 + name = "attempt" + counter = 4 + +#test the module + + + + diff --git a/environment.py b/environment.py new file mode 100644 index 0000000000000000000000000000000000000000..46327927299a4adb909db8c3b43f1f3ca3e82036 --- /dev/null +++ b/environment.py @@ -0,0 +1,208 @@ +import itertools +import numpy as np + +from episode import Episode + +""" +This class contains the definition of the current assistive domain +""" + + +class Environment: + def __init__(self, action_space, initial_state, goal_states, user_actions, states, rewards, + task_complexity=3, task_length=10, n_max_attempt_per_object=5, + timeout=20, n_levels_assistance=5): + + self.action_space = action_space + self.initial_state = initial_state + self.goal_states = goal_states + self.user_actions = user_actions + self.rewards = rewards + self.states = self.get_states(states) + self.task_complexity = task_complexity + self.task_length = task_length + self.task_progress = 1 + self.n_max_attempt_per_object = n_max_attempt_per_object + self.n_attempt_per_object = 1 + self.n_total_attempt = 1 + self.timeout = timeout + self.n_levels_assistance = n_levels_assistance + + def reset(self): + self.task_progress = 1 + self.n_attempt_per_object = 0 + self.n_total_attempt = 0 + + def get_actions_space(self): + return self.action_space + + def get_states_list(self): + return self.states + + def get_states(self, states): + states_list = list(itertools.product(*states)) + # states_list.insert(0, self.initial_state) + return states_list + + def point_to_index(self, point): + return self.states.index(tuple(point)) + + def get_initial_state(self): + return self.initial_state + + def is_goal_state(self, state): + return (state) in self.goal_states + + def get_task_length(self): + return self.task_length + + def get_task_level(self): + return self.task_level + + def get_task_progress(self): + return self.task_progress + + def get_n_objects(self): + return self.n_objects + + def get_n_attempt_per_object(self): + return self.n_attempt_per_object + + def get_n_total_attempt(self): + return self.n_total_attempt + + def get_n_max_attempt_per_object(self): + return self.n_max_attempt_per_object + + def get_n_levels_assistance(self): + return self.n_levels_assistance + + def get_timeout(self): + return self.timeout + + def set_task_length(self, other): + self.task_length = other + + def set_task_level(self, other): + self.task_level = other + + def set_task_progress(self, other): + self.task_progress = other + + def set_n_objects(self, other): + self.n_objects = other + + def set_n_attempt_per_object(self, other): + self.n_attempt_per_object = other + + def set_n_total_attempt(self, other): + self.n_total_attempt = other + + def set_n_max_attempt_per_object(self, other): + self.n_max_attempt_per_object = other + + def set_timeout(self, other): + self.timeout = other + + def gen_transition_matrix(self, episodes, state_space_list, action_space_list, final_state_list, + path_trans_matrix_occ, + path_trans_matrix_prob): + ''' + This function generate the transition matrix function + Args: + episodes: the list of episodes + state_space_list: the list of states + action_space_list: the list of actions + final_state_list: the list of final states + path_trans_matrix_occ: the file with the occ matrix + path_trans_matrix_prob: the file with the matrix prob + Return: + the transition matrix probabilities + ''' + trans_matrix = Episode.generate_statistics(episodes, state_space_list, action_space_list) + # save the episode on a file + with open(path_trans_matrix_occ, "ab") as f: + np.save(f, trans_matrix) + f.close() + trans_matrix_occ = Episode.read(path_trans_matrix_occ) + trans_matrix_prob = Episode.compute_probabilities(trans_matrix_occ, final_state_list) + # save the episode on a file + with open(path_trans_matrix_prob, "ab") as f: + np.save(f, trans_matrix_prob) + f.close() + return trans_matrix_prob + + + def step(self, current_state, outcome, assistance_level, reward_matrix): + '''This function compute the next state and the current reward + Args: + outcome: is the outcome of the user's action + assistance_level: is the level of assistance given by the robot + reward_matrix: if there is a file then use it for as reward + Returns: + :next_state + :reward + :done if reached the final state + ''' + + reward = 0 + done = False + user_action = 0 + + if self.task_progress >= 0 and self.task_progress <= self.task_length / 3: + self.task_complexity = 1 + elif self.task_progress > self.task_length / 3 and self.task_progress <= 2 * self.task_length / 3: + self.task_complexity = 2 + elif self.task_progress > 2 * self.task_length / 3 and self.task_progress < self.task_length: + self.task_complexity = 3 + elif self.task_progress == self.task_length: + self.task_complexity = 4 + + + if outcome[0] == "max_attempt": + user_action = 1 + reward = -1 + self.n_attempt_per_object = 1 + self.n_total_attempt += 1 + self.task_progress += 1 + + elif outcome[0] == "wrong_action": + user_action = -1 + reward = 0 # * (self.task_progress) * (self.n_attempt_per_object+1) * (assistance_level+1) + self.n_attempt_per_object += 1 + self.n_total_attempt += 1 + + elif outcome[0] == "timeout": + user_action = 0 + reward = -1 + self.n_attempt_per_object += 1 + self.n_total_attempt += 1 + elif outcome[0] == "correct_action": + user_action = 1 + reward = 0.3 * (self.n_levels_assistance - assistance_level - 1) / (self.n_levels_assistance) + 0.3 * ( + self.n_max_attempt_per_object - self.n_attempt_per_object + 1) / (self.n_max_attempt_per_object) + 0.3 * ( + self.task_length - self.task_progress) / (self.task_length) + 0.1 * ( + 30 - outcome[1]) / 30 + self.n_attempt_per_object = 1 + self.n_total_attempt += 1 + self.task_progress += 1 + else: + assert Exception("The outcome index is not right, please check out the documentation") + + next_state = (self.task_complexity, self.n_attempt_per_object, user_action) # , self.task_level) + + if self.is_goal_state(next_state): + done = True + + if reward_matrix != "": + reward = reward_matrix(current_state, assistance_level, next_state) + + return next_state, reward, done + + +def __str__(self): + return "RL Method" + self.rl_method + "action_space[" + str(self.action_space)[1:-1] + "] feedback [" + str( + self.feedback)[1:-1] + "] reward [" + str(self.reward)[1:-1] + "] state [" + str(self.state)[ + 1:-1] + "] max_attempt:" + str( + self.max_attempt) + " game complexity: " + str(self.game_complexity) + diff --git a/episode.py b/episode.py new file mode 100644 index 0000000000000000000000000000000000000000..e711c024122770a42b719d7418949471f2816eb6 --- /dev/null +++ b/episode.py @@ -0,0 +1,218 @@ +""" +Episodes representing expert demonstrations and automated generation +thereof. +""" + + +import numpy as np +from itertools import chain +import itertools +import os +import time +import sys + +class Episode: + """ + A episode consisting of states, corresponding actions, and outcomes. + + Args: + transitions: The transitions of this episode as an array of + tuples `(state_from, action, state_to)`. Note that `state_to` of + an entry should always be equal to `state_from` of the next + entry. + """ + + def __init__(self, states=[]): + self._t = list() + for s in states: + self._t.append(tuple(s)) + + def transition(self, state_from, action, state_to): + self._t.append((state_from, action, state_to)) + + def transitions(self): + """ + The transitions of this episode. + + Returns: + All transitions in this episode as array of tuples + `(state_from, action, state_to)`. + """ + return self._t + + def states(self): + """ + The states visited in this episode. + + Returns: + All states visited in this episode as iterator in the order + they are visited. If a state is being visited multiple times, + the iterator will return the state multiple times according to + when it is visited. + """ + return map(lambda x: x[0], chain(self._t, [(self._t[-1][2], 0, 0)])) + + def __repr__(self): + return "EpisodeGenerator({})".format(repr(self._t)) + + def __str__(self): + return "{}".format(self._t) + + + + + + def get_states(self, states, initial_state): + states_list = list(itertools.product(*states)) + states_list.insert(0, initial_state) + return states_list + + + def state_from_point_to_index(self, states, point): + return states.index(tuple(point)) + + + + def state_from_index_to_point(self, state_tuple, index): + return state_tuple[index] + + def load_episodes(self, file): + ''' + It returns the episodes related to the saved file + :param file: + :param episode: look at main.py + :param sol_per_pop: look at main.py + :return: a list of episodes + ''' + print("LOADING...") + + trajs = list() + with open(file, "rb") as f: + traj = np.load(f, allow_pickle=True) + for t in range(len(traj)): + trajs.append(Episode(traj[t])) + print("loaded traj ", t) + f.close() + for t in trajs: + print(t._t) + return trajs + + + def generate_statistics(self, state_list, action_space, episodes): + ''' + This function computes the state x state x action matrix that + corresponds to the transition table we will use later + ''' + print(state_list) + n_states = len(state_list) + n_actions = len(action_space) + + #create a matrix state x state x action + table = np.zeros(shape=(n_states, n_states, n_actions)) + start_time = time.time() + s1, s2, a = range(n_states), range(n_states), range(n_actions) + for s_from in s1: + for act in a: + for s_to in s2: + #convert to coord + s_from_coord = self.state_from_index_to_point(state_list, s_from) + s_to_coord = self.state_from_index_to_point(state_list, s_to) + #print("from:", s_from_coord," to:", s_to_coord) + #print() + for traj in episodes: + if (s_from, act, s_to) in traj._t: + table[s_from, s_to, act] += 1 + elapsed_time = time.time()-start_time + print("processing time:{}".format(elapsed_time)) + return table + + + def compute_probabilities(self, transition_matrix, terminal_state, state_space): + """ + We compute the transitions for each state_from -> action -> state_to + :param transition_matrix: matrix that has shape n_states x n_states x action + :return: + """ + n_state_from, n_state_to, n_actions = transition_matrix.shape + transition_matrix_with_prob = np.zeros((n_state_from, n_state_to, n_actions)) + + for s_from in range(n_state_from): + s_in_prob = list() + sum_over_prob = 0 + #get the episode from s_from to all the possible state_to given the 5 actions + #get all the occurrence on each column and compute the probabilities + #remember for each column the sum of probabilities has to be 1 + for a in range(n_actions): + trans_state_from = list(zip(*transition_matrix[s_from]))[a] + #needs to be done to avoid nan (0/0) + + sum_over_prob = sum(trans_state_from) if sum(trans_state_from)>0 else sys.float_info.min + + s_in_prob.append(list(map(lambda x: x/sum_over_prob, trans_state_from))) + + transition_matrix_with_prob[s_from][:][:] = np.asarray(s_in_prob).T + + for state in terminal_state: + state_idx = self.state_from_point_to_index(state_space, state) + transition_matrix_with_prob[state_idx][state_idx][0] = 1 + + return transition_matrix_with_prob + + + def read_transition_matrix(self, file): + print("Loading trans matrix...") + fileinfo = os.stat(file) + trans_matrix = list() + with open(file, "rb") as f: + trans_matrix = np.load(f, allow_pickle=True) + + #trans_matrix_reshaped = np.asarray(trans).reshape(n_states, n_states, n_actions) + print("Done") + return trans_matrix + + +def main(): + + file_path = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/episodes.npy" + ep = Episode() + episodes = ep.load_episodes(file_path) + initial_state = (1, 1, 0) + n_max_attempt = 5 + task_length = 6 + # Environment setup for RL agent assistance + action_space = ['LEV_0', 'LEV_1', 'LEV_2', 'LEV_3', 'LEV_4', 'LEV_5'] + user_actions_state = [-1, 0, 1] + final_states = [(task_length, a, u) for a in range(1, n_max_attempt) for u in range(-1, 2) ] + # defintion of state space + attempt = [i for i in range(1, n_max_attempt)] + game_state = [i for i in range(1, task_length+1)] + user_actions = [i for i in (user_actions_state)] + states_space = (game_state, attempt, user_actions) # , task_levels) + + env = Environment(action_space, initial_state, final_states, user_actions, states_space, + task_length, n_max_attempt, timeout=0, n_levels_assistance=6) + # + trans_matrix = ep.generate_statistics(env.states, env.action_space, episodes) + path_trans_matrix_occ = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_occ.npy" + path_trans_matrix_prob = "/home/aandriella/Documents/Codes/MY_FRAMEWORK/BN_GenerativeModel/results/1/trans_matrix_prob.npy" + terminal_states = [env.point_to_index(state) for state in final_states] + + + # save the episode on a file + with open(path_trans_matrix_occ, "ab") as f: + np.save(f, trans_matrix) + f.close() + trans_matrix_occ = ep.read_trans_matrix(path_trans_matrix_occ) + print(trans_matrix_occ.shape) + trans_matrix_prob = ep.compute_probabilities(trans_matrix_occ, terminal_states) + # save the episode on a file + with open(path_trans_matrix_prob, "ab") as f: + np.save(f, trans_matrix_prob) + f.close() + + #prob = read_trans_matrix(path_trans_matrix_prob, 0, 0) + + + +if __name__ == "__main__": + main()