main.py 12.00 KiB
#!/usr/bin/python
'''
This is the main class for running the entire game framework
'''
#import modules and classes
#import from libraries
import enum
import random
import time
#import from ros
import rospy
from board_state.msg import TokenMsg
from board_state.msg import BoardMsg
user_picked = (0, 0)
user_placed = (0, 0)
class Game(object):
def __init__(self):
rospy.init_node('big_hero', anonymous=True)
# subscriber for getting info from the board
rospy.Subscriber("/detected_move", TokenMsg, self.get_move_event_callback)
rospy.Subscriber("/board_status", BoardMsg, self.get_board_event_callback)
# get the objective of the exercise
self.current_board = []
rospy.sleep(2)
self.initial_board = self.get_board_event()
self.objective = rospy.get_param("/objective")
self.solution = self.set_objective(5)
self.n_attempt_per_token = 1
self.n_max_attempt_per_token = 5#n_max_attempt
self.n_mistakes = 0
self.n_solution = 10#n_solution
self.n_correct_move = 0
self.detected_token = []
self.picked = False
self.placed = False
self.moved_back = False
self.react_time_per_token_t0 = 0
self.react_time_per_token_t1 = 0.0
self.elapsed_time_per_token_t0 = 0.0
self.elapsed_time_per_token_t1 = 0.0
self.total_elapsed_time = 0
self.move_info = {'token_id':'', 'from':'', 'to':'', 'robot_assistance':'', 'react_time':'', 'elapsed_time':''}
self.move_info_vect = list()
#log info
def get_board_event_callback(self, msg):
'''callback from the topic board_status to get the status of the current_board'''
self.current_board = msg.data
def get_board_event(self):
'''This method returns what is listened by the subscriber'''
return self.current_board
def get_move_event_callback(self, msg):
'''callback from the topic detected_move to get the detected move if so'''
self.detected_token = msg.detected_token
self.picked = msg.picked
self.placed = msg.placed
self.moved_back = msg.moved_back
def get_move_event(self):
'''This method just returns what is listened by the subscriber'''
return self.detected_token, self.picked, self.placed, self.moved_back
def set_objective(self, n_token):
'''The method return a list with the tokens ordered based on the solution of the exercise'''
board_ = self.current_board[:]
#remove the empty cells from the current_board
board_filtered = list(filter(lambda x: x!="0", board_))
if self.objective == "ascending":
self.solution = sorted(board_filtered)[:n_token]
elif self.objective == "descending":
self.solution = sorted(board_filtered, reverse=False)[:n_token]
else:
assert("Game is not defined contact the developer for integrating it")
return self.solution
#methods to access the variables outside the class
def get_n_attempt_per_token(self):
return self.n_attempt_per_token
def get_n_max_attempt(self):
return self.n_max_attempt
def get_n_mistakes(self):
return self.n_mistakes
def get_n_solution(self):
return self.n_solution
def get_n_correct_move(self):
return self.n_correct_move
def reset_attempt_per_token(self):
self.n_attempt_per_token = 0
def set_n_attempt_per_token(self, value):
self.n_attempt_per_token = value
def set_n_mistakes(self, value):
self.n_mistakes = value
def set_n_correct_move(self, value):
self.n_correct_move = value
def set_n_solution(self, value):
self.n_solution = value
def set_n_max_attempt_per_token(self, value):
self.n_max_attempt_per_token = value
class StateMachine(enum.Enum):
#def __init__(self, e):
#initialise the node and the subscriber
S_ROBOT_ASSIST = 1
S_USER_ACTION = 2
S_USER_PICK_TOKEN = 3
S_ROBOT_FEEDBACK = 4
S_USER_PLACE = 5
S_USER_PLACE_TOKEN_BACK = 6
S_USER_PLACE_TOKEN_SOL = 7
S_USER_MOVE_TOKEN_BACK = 8
S_ROBOT_MOVE_TOKEN_BACK = 9
S_ROBOT_MOVE_CORRECT_TOKEN = 10
S_ROBOT_OUTCOME = 11
CURRENT_STATE = 1
b_robot_assist_finished = False
b_robot_feedback_finished = False
b_user_picked_token = False
b_user_placed_token_back = False
b_user_placed_token_sol = False
b_robot_outcome_finised = False
b_robot_moved_token_back = False
b_user_moved_token_back = False
b_robot_moved_correct_token = False
b_user_reached_max_attempt = False
def robot_provide_assistance(self, game):
'''
Robot action of assistance combining speech and gesture
:return: True when the action has been completed
'''
print("R_ASSISTANCE")
game.move_info['robot_assistance'] = random.randint(0,4)
rospy.sleep(2.0)
self.b_robot_assist_finished = True
self.CURRENT_STATE = self.S_USER_ACTION
return self.b_robot_assist_finished
def robot_provide_feedback(self):
'''
The robot provides a feddback on the grasped token
:return:
'''
print("R_FEEDBACK")
self.b_robot_feedback_finished = True
self.CURRENT_STATE = self.S_USER_PLACE
return self.b_robot_feedback_finished
def robot_provide_outcome(self, game):
'''
Robot provides the user with the outcome of their move
:param self:
:return:
'''
print("R_OUTCOME")
outcome = 0
#get current move and check if it is the one expeceted in the solution list
if game.detected_token[0] == game.solution[game.n_correct_move] \
and game.detected_token[2] == str(game.solution.index(game.detected_token[0])+1):
outcome = 1
game.n_correct_move += 1
game.n_attempt_per_token = 1
game.set_n_correct_move(game.n_correct_move)
game.set_n_attempt_per_token(game.n_attempt_per_token)
print("correct_solution ", game.get_n_correct_move())
self.CURRENT_STATE = self.S_ROBOT_ASSIST
elif game.detected_token[0] == []:
outcome = 0
print("timeout")
game.n_mistakes += 1
game.n_attempt_per_token += 1
game.set_attempt_per_token(game.n_attempt_per_token)
game.set_n_mistakes(game.n_mistakes)
self.CURRENT_STATE = self.S_ROBOT_ASSIST
# check if the user reached his max number of attempts
if game.n_attempt_per_token >= game.n_max_attempt:
self.S_ROBOT_MOVE_CORRECT_TOKEN = True
self.b_user_reached_max_attempt = True
self.robot_move_correct_token()
elif game.detected_token[0] != game.solution[game.n_correct_move] \
or game.detected_token[2] != game.solution.index(game.detected_token[0])+1:
outcome = -1
print("wrong_solution")
game.n_mistakes += 1
game.n_attempt_per_token += 1
game.set_n_attempt_per_token(game.n_attempt_per_token)
game.set_n_mistakes(game.n_mistakes)
self.CURRENT_STATE = self.S_ROBOT_MOVE_TOKEN_BACK
self.user_move_back(game)
#check if the user reached his max number of attempts
if game.n_attempt_per_token>=game.n_max_attempt_per_token:
self.S_ROBOT_MOVE_CORRECT_TOKEN = True
self.b_user_reached_max_attempt = True
self.robot_move_correct_token(game)
self.b_robot_outcome_finished = True
return self.b_robot_outcome_finished, outcome
def robot_move_back(self):
#user moved the token in an incorrect location
#robot moved it back
print("Robot moved back the token")
self.CURRENT_STATE = self.S_ROBOT_ASSIST
self.b_robot_moved_token_back = True
return self.b_robot_moved_token_back
def robot_move_correct_token(self, game):
print("Robot moves the correct token as the user reached the max number of attempts")
#get the current solution
token = game.solution[game.n_correct_move]
_from = game.initial_board.index(token)
_to = game.solution.index(token)+1
press_key = raw_input("Please move the token {} at loc {}".format(token, _to))
game.set_n_correct_move(game.get_n_correct_move()+1)
game.set_n_attempt_per_token(1)
def user_move_back(self, game):
#user moved the token in an incorrect location
#robot moved it back
print("User moved back the token")
#get the initial location of the placed token and move back there
token, _to, _from = game.detected_token
while(game.detected_token!=[token, _from, _to]):
pass
self.CURRENT_STATE = self.S_ROBOT_ASSIST
self.b_user_moved_token_back = True
return self.b_user_moved_token_back
def user_action(self, game):
''' Dispach user action'''
'''We wait until the user pick a token'''
print("U_ACTION")
def user_pick_token(sm, game):
game.react_time_per_token_t0 = time.time()
print("U_PICK")
sm.CURRENT_STATE = sm.S_ROBOT_FEEDBACK
detected_token, picked, _, _ = game.get_move_event()
while(not picked):
detected_token, picked, _, _ = game.get_move_event()
game.elapsed_time_per_token_t0 = time.time()
game.react_time_per_token_t1 = time.time()-game.react_time_per_token_t0
game.move_info['react_time'] = game.react_time_per_token_t1
game.move_info['token_id'], game.move_info['from'] = game.detected_token[0], game.detected_token[1]
sm.user_picked_token = True
user_picked = detected_token
return sm.user_picked_token
def robot_provide_feedback(sm):
print("R_FEEDBACK")
sm.CURRENT_STATE = sm.S_USER_PLACE
sm.robot_provided_feeback_finished = True
return sm.robot_provided_feeback_finished
''' When the user picked the token we check where they place it'''
'''either they can place it back '''
def user_place(sm, game):
print("U_PLACE")
#check where the user will place the token
def user_place_token_back(sm):
print("U_PLACE_BACK")
sm.CURRENT_STATE = sm.S_USER_ACTION
sm.b_user_placed_token_back = True
game.elapsed_time_per_token_t1 = time.time()-game.elapsed_time_per_token_t0
game.move_info['to'] = game.detected_token[2]
game.move_info['elapsed_time'] = game.elapsed_time_per_token_t1
return sm.b_user_placed_token_back
'''or they can place it in the solution row'''
def user_place_token_sol(sm):
print("U_PLACE_SOL")
sm.CURRENT_STATE = sm.S_ROBOT_OUTCOME
sm.b_user_placed_token_sol = True
game.elapsed_time_per_token_t1 = time.time() - game.elapsed_time_per_token_t0
game.move_info['to'] = game.detected_token[2]
game.move_info['elapsed_time'] = game.elapsed_time_per_token_t1
return sm.b_user_placed_token_sol
'''return where the user placed the token'''
#here we get the token that has been placed and trigger one or the other action
detected_token, picked, placed, moved_back = game.get_move_event()
while (not placed):
detected_token, _, placed, moved_back = game.get_move_event()
if (placed and moved_back):
user_place_token_back(sm)
self.CURRENT_STATE = sm.S_USER_ACTION
elif (placed and not moved_back):
user_place_token_sol(sm)
self.CURRENT_STATE = sm.S_ROBOT_OUTCOME
else:
assert "Unexpected state"
self.CURRENT_STATE = self.S_USER_ACTION
if user_pick_token(self, game):
if robot_provide_feedback(self):
if user_place(self, game):
return True
else:
self.CURRENT_STATE = self.S_USER_PICK_TOKEN
else:
self.CURRENT_STATE = self.S_USER_ACTION
def num_to_func_to_str(self, argument):
switcher = {
0: self.robot_provide_assistance,
1: self.robot_provide_feedback,
2: self.user_action,
3: self.robot_provide_outcome,
4: self.robot_move_back,
5: self.user_move_back,
6: self.robot_move_correct_token
}
# get the function based on argument
func = switcher.get(argument)
# Execute the function
return func()
def main():
game = Game()
game.set_n_solution(5)
game.set_n_max_attempt_per_token(4)
current_board = game.get_board_event()
#game.solution = game.set_objective(n_solution)
sm = StateMachine(1)
while game.get_n_correct_move()<game.n_solution:
if sm.CURRENT_STATE.value == sm.S_ROBOT_ASSIST.value:
sm.robot_provide_assistance(game)
elif sm.CURRENT_STATE.value == sm.S_USER_ACTION.value:
time_to_act = time.time()
sm.user_action(game)
game.total_elapsed_time += time.time()-time_to_act
elif sm.CURRENT_STATE.value == sm.S_ROBOT_OUTCOME.value:
sm.robot_provide_outcome(game)
print("game_state:", game.get_n_correct_move(), "n_attempt_token:", game.get_n_attempt_per_token())
print("react time: ", game.react_time_per_token_t1, "elapsed time: ", game.elapsed_time_per_token_t1)
game.move_info_vect.append(game.move_info.copy())
print(game.move_info)
print("correct_move ", game.get_n_correct_move,
" n_mistakes ", game.n_mistakes)
for instance in game.move_info_vect:
print(instance)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
pass