Greed World Problem

  • \(\mathcal{S} = \{[0,0],[0,1],...,[3,3]\}\)
  • \(\mathcal{A}(s) = \{\text{up,down,right,left}\}\)
  • \(R_t = -1\)
  • \(\gamma = 1\)
  • Deterministically cause the corresponding state transitions
  • The actions that would take the agent off the grid in fact leave the state unchanged.
  • undiscounted, episodic task

True Value function \(v_\pi\)

  • \(\forall s,\forall a : \pi(a|s) = 0.25\)

import numpy as np
import time
np.array ([[  0,-14,-20 ,-22],
           [-14, -18, -20, -20],
           [-20, -20, -18, -14.],
           [-22, -20, -14,   0.]])
array([[  0., -14., -20., -22.],
       [-14., -18., -20., -20.],
       [-20., -20., -18., -14.],
       [-22., -20., -14.,   0.]])

Optimal action value function, \(q_{\star}\)

Set up

class Environment():
    def __init__(self,grid_size):
        self.grid_size = np.array(grid_size)
        self.S = [np.array([i,j]) for i in range(grid_size[0]) for j in range(grid_size[1])]
        self.Terminal_states = [(0,0),(self.grid_size[0]-1,self.grid_size[1]-1)]
        self.A = ["W","E","N","S"]
        self.A_to_coord = {"W" : [0,-1],"E" : [0,1], "N" : [-1,0], "S" : [1,0]}
        for k,v in self.A_to_coord.items():
            self.A_to_coord[k] = np.array(v)
        self.gamma = 1
        self.R = -1
    def move(self,s,a):
        s : current_state(position)
        a : action(from deterministic policy π)
        s_next : state after one_step transition
        Input : s = [0,1],a = "W"
        Output : s_next = [0,0]
        s_next = s + self.A_to_coord[a]
        if s_next[0] < 0 or s_next[1] <0 or s_next[0] >= self.grid_size[0] or s_next[1] >= self.grid_size[1]:
            return s # |S|를 넘어갈 경우, 원래 상태를 반환
            return s_next
    def move_test(self):
        S = [np.array([i,j]) for i in [0,1,2,3] for j in [0,1,2,3]]
        for s in S:
            for a in self.A:
    def dynamics(self,s_prime,r,s,a):
        s : current_state(position)
        a : action(from deterministic policy π)
        s_prime : all of the possible states after one-step transition
        r : immediate reward(-1)
        0 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 다를때
        1 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 같을때
        즉, a방향으로 움직이면 반드시 a방향으로 감(deterministic)
        s_next = self.move(s,a)
        if np.sum(s_next != s_prime)>=1:
            return 0
            return 1
    def dynamics_test(self,check_p = 1):
        for s in self.S:
            for a in self.A:
                for s_prime in self.S: #가능한 모든 next_state
                    if self.dynamics(s_prime,r,s,a) == check_p:
                        print(f"state : {s} action : {a} s_prime : {s_prime} dynamics : {self.dynamics(s_prime,r,s,a)}")
    def sampling_action(self,s_t,π):
        Input current state s_t : tuple
        ouput action : str (a ~ π(a|s))
        actions = []
        prob = []
        for a,p in π[s_t].items():
        return np.random.choice(a=actions,p=prob)
    def generate_π(self,uniform=False):
        Input : NA
        Output : {(x,y) : {"W" : pr1,"E" : pr2 ,...}}
        {(0,0):{"W" : 0.1, "E" : 0.2, ...},(0,1):{"W":0.4,"E":0.5...}}
        π = {(i,j): {} for i in range(self.grid_size[0]) for j in range(self.grid_size[1])}
        for t in π.values():
            unnormalized_prob = np.random.rand(4)
            if uniform == False:
                prob = unnormalized_prob/np.sum(unnormalized_prob)
                prob = [0.25] * 4
            for i in range(len(self.A)):
                t[self.A[i]] = prob[i]
        return π
    def argmax_a_Q(self,Q,s):
        max_action = "W"
        max_value = -5000
        for visit,Q_val in Q.items():
            if visit[0] == s:
                if Q_val > max_value:
                    max_action = visit[1]
                    max_value = Q_val                    
        return max_action
class Q_learning(Environment):
    def __init__(self,grid_size=(4,4)):
    def control(self,s_0,iter_num,alpha,epsilon):
        Q = {(tuple(s),a) : 0 for s in self.S for a in self.A}
        π = self.generate_π()
        γ = 1
        for ep_num in range(iter_num):
            ϵ = epsilon ** (ep_num + 1)
            α = alpha ** (ep_num + 1)
            if ep_num % 1000 == True:
                print(f"epsilon : {ϵ} alpha : {α}")
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                Q[(s_t,a_t)] = Q[(s_t,a_t)] + α * (r + γ * Q[(s_prime,self.argmax_a_Q(Q,s_prime))] - Q[(s_t,a_t)])
                a_star = self.argmax_a_Q(Q,tuple(s_t))
                for (state,action),value in Q.items():
                    if action == a_star:
                        π[state][action] = 1 - ϵ + ϵ /len(self.A)
                        π[state][action] = ϵ/len(self.A)
                s_t = s_prime
        return π,Q



\[\begin{aligned} \alpha(s) = \frac{1}{N(s)} \end{aligned}\]
class TD(Environment):
    def __init__(self,grid_size=(4,4)):
    def prediction(self,s_0,π,iter_nums = 50):
        t = time.time()
        V = {tuple(s) : 0 for s in self.S}
        N = {tuple(s) : 0 for s in self.S}
        for ep_num in range(iter_nums):
            N[s_0] += 1
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                N[s_prime] += 1
                V[s_t] = V[s_t] + (1/N[s_t]) * (r + 1*V[s_prime] - V[s_t])
                s_t = s_prime
        print(f"lead time : {time.time()-t}")
        return N,V

td = TD()
π = td.generate_π(True)
N,V = td.prediction(s_0 = (2,1),π = π,iter_nums = 10000)
lead time : 3.84700870513916
  • It doesn’t converge to a true value.
  • I thought the number of iterations was too small, so I did some more.
class TD(Environment):
    def __init__(self,grid_size=(4,4)):
    def prediction(self,s_0,π,iter_nums = 50):
        t = time.time()
        V = {tuple(s) : 0 for s in self.S}
        N = {tuple(s) : 0 for s in self.S}
        for ep_num in range(iter_nums):
            N[s_0] += 1
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                N[s_prime] += 1
                V[s_t] = V[s_t] + (1/N[s_t]) * (r + 1*V[s_prime] - V[s_t])
                s_t = s_prime
        print(f"lead time : {time.time()-t}")
        return N,V

td = TD()
π = td.generate_π(True)
N,V = td.prediction(s_0 = (2,1),π = π,iter_nums = 1000000)
lead time : 376.06808495521545
  • Increasing the iteration actually increased the difference with the true value function.
  • It seems wrong to modify \(\alpha\) based on the number of returns like MC does.


\[\begin{aligned} \alpha = k \end{aligned}\]
class TD(Environment):
    def __init__(self,grid_size=(4,4)):
    def prediction(self,s_0,π,iter_nums,α=0.1):
        V = {tuple(s) : 0 for s in self.S}
        for ep_num in range(iter_nums):
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                V[s_t] = V[s_t] + α * (r + 1*V[s_prime] - V[s_t])
                s_t = s_prime
        return V
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 10000= 0.9)
  • It was still far from the true value, so I decided to reduce the alpha.
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 10000= 0.25)
  • It seemed to get a little closer to the true value at alpha=0.25.
  • So I wonder if it would converge if I increased the number of iterations here.
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 100000= 0.25)
  • It seemed to get a little more closer to the true value by increasing the number of iterations.
  • The intuition here is that by decreasing alpha more and more and increasing iterations, I can get closer to the true value.
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 100000= 0.025)
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 100000= 0.001)
  • It approximates the true value fairly well.
  • I wonder if increasing the number of iterations would increase the value? Is this really convergence?
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 500000= 0.001)
  • It seems to have converged.
  • Insight
    1. It was stated that decreasing \(\alpha\) will always converge to True. However, decreasing \(\frac{1}{N}\) did not converge to the true value. Perhaps we need to increase the decrease further.
    2. The book said that if \(\alpha\) is a constant, it will converge to True on average. In my experiments, as long as \(\alpha\) is appropriate, 100% of them converged to the true value.
    3. In TD, the step size alpha should be set small by default. I think it should be at most 0.01 and definitely less than that.

SARSA(On-Policy TD Control)

\[\begin{aligned} \pi(a|s) = \begin{cases} 1-\epsilon + \frac{\epsilon}{|A(s)|}\quad(A = \text{greedy action})\\ \epsilon /|A(s)| \quad\quad\quad\quad (\text{otherwise}) \end{cases} \end{aligned}\]
import numpy as np
import time
class Environment():
    def __init__(self,grid_size):
        self.grid_size = np.array(grid_size)
        self.S = [np.array([i,j]) for i in range(grid_size[0]) for j in range(grid_size[1])]
        self.Terminal_states = [(0,0),(self.grid_size[0]-1,self.grid_size[1]-1)]
        self.A = ["W","E","N","S"]
        self.A_to_coord = {"W" : [0,-1],"E" : [0,1], "N" : [-1,0], "S" : [1,0]}
        for k,v in self.A_to_coord.items():
            self.A_to_coord[k] = np.array(v)
        self.gamma = 1
        self.R = -1
    def move(self,s,a):
        s : current_state(position)
        a : action(from deterministic policy π)
        s_next : state after one_step transition
        Input : s = [0,1],a = "W"
        Output : s_next = [0,0]
        s_next = s + self.A_to_coord[a]
        if s_next[0] < 0 or s_next[1] <0 or s_next[0] >= self.grid_size[0] or s_next[1] >= self.grid_size[1]:
            return s # |S|를 넘어갈 경우, 원래 상태를 반환
            return s_next
    def move_test(self):
        S = [np.array([i,j]) for i in [0,1,2,3] for j in [0,1,2,3]]
        for s in S:
            for a in self.A:
    def dynamics(self,s_prime,r,s,a):
        s : current_state(position)
        a : action(from deterministic policy π)
        s_prime : all of the possible states after one-step transition
        r : immediate reward(-1)
        0 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 다를때
        1 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 같을때
        즉, a방향으로 움직이면 반드시 a방향으로 감(deterministic)
        s_next = self.move(s,a)
        if np.sum(s_next != s_prime)>=1:
            return 0
            return 1
    def dynamics_test(self,check_p = 1):
        for s in self.S:
            for a in self.A:
                for s_prime in self.S: #가능한 모든 next_state
                    if self.dynamics(s_prime,r,s,a) == check_p:
                        print(f"state : {s} action : {a} s_prime : {s_prime} dynamics : {self.dynamics(s_prime,r,s,a)}")
    def sampling_action(self,s_t,π):
        Input current state s_t : tuple
        ouput action : str (a ~ π(a|s))
        actions = []
        prob = []
        for a,p in π[s_t].items():
        return np.random.choice(a=actions,p=prob)
    def generate_π(self,uniform=False):
        Input : NA
        Output : {(x,y) : {"W" : pr1,"E" : pr2 ,...}}
        {(0,0):{"W" : 0.1, "E" : 0.2, ...},(0,1):{"W":0.4,"E":0.5...}}
        π = {(i,j): {} for i in range(self.grid_size[0]) for j in range(self.grid_size[1])}
        for t in π.values():
            unnormalized_prob = np.random.rand(4)
            if uniform == False:
                prob = unnormalized_prob/np.sum(unnormalized_prob)
                prob = [0.25] * 4
            for i in range(len(self.A)):
                t[self.A[i]] = prob[i]
        return π
    def argmax_a_Q(self,Q,s):
        max_action = "W"
        max_value = -5000
        for visit,Q_val in Q.items():
            if visit[0] == s:
                if Q_val > max_value:
                    max_action = visit[1]
                    max_value = Q_val                    
        return max_action
class SARSA(Environment):
    def __init__(self,grid_size=(4,4)):
    def control(self,s_0,iter_nums,α=0.9= 0.9995):
        Q = {(tuple(s),a):0 for s in self.S for a in self.A}
        π = self.generate_π()
        action_space_size = len(self.A)
        for ep_num in range(1,iter_nums):
            s_t = s_0;a = self.sampling_action(s_0,π)
            a_t = self.sampling_action(s_t,π)
            epsilon = ϵ #실수한 부분
            alpha = α #GLIE?
            if ep_num % 1000 == 0:
                print(f"episode : {ep_num}, epsilon : {epsilon}, alpha : {alpha}")
            while s_t not in self.Terminal_states:
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                #sampling action ~ ϵ-greedy action
                #1. make ϵ-greedy policy
                a_star = self.argmax_a_Q(Q,s_prime)
                for action,prob in π[s_prime].items():
                    if action == a_star:
                        π[s_prime][action] = 1 - epsilon + epsilon/action_space_size
                        π[s_prime][action] = epsilon/action_space_size
                #2. sampling action
                a_prime = self.sampling_action(s_prime,π)
                Q[(s_t,a_t)] = Q[(s_t,a_t)] + alpha*(r + 1*Q[(s_prime,a_prime)] - Q[(s_t,a_t)])
                #print(alpha*(r + 1*Q[(s_prime,a_prime)] - Q[(s_t,a_t)]))
                s_t = s_prime;a_t = a_prime
        return Q,π
def difference(q_π,Q):
    true_val = []
    approx_val = []
    for k,v in q_π.items():
    for k,v in Q.items():
    return ((np.array(true_val) - np.array(approx_val))**2)**(1/2)
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= 0.9= 0.9)
rmse = difference(q_star,Q)
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= 0.5= 0.9)
rmse = difference(q_star,Q)
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= 0.9= 0.5)
rmse = difference(q_star,Q)
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= 0.2= 0.5)
rmse = difference(q_star,Q)
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= 0.5= 0.2)
rmse = difference(q_star,Q)
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= 0.01= 0.2)
rmse = difference(q_star,Q)
  • Here I remembered that there are some conditions for the salsa algorithm to converge.
  • GLIE, Robbins-Monro sequence of step-size \(\alpha_t\)
(GLIE) (Robbins-Monro)
class SARSA(Environment):
    def __init__(self,grid_size=(4,4)):
    def control(self,s_0,iter_nums,α=0.9= 0.9995):
        Q = {(tuple(s),a):0 for s in self.S for a in self.A}
        π = self.generate_π()
        action_space_size = len(self.A)
        for ep_num in range(1,iter_nums):
            s_t = s_0;a = self.sampling_action(s_0,π)
            a_t = self.sampling_action(s_t,π)
            epsilon = ϵ ** (ep_num + 1) #실수한 부분
            alpha = α**(ep_num+1) #GLIE?
            if ep_num % (iter_nums / 10) == 0:
                print(f"episode : {ep_num}, epsilon : {epsilon}, alpha : {alpha}")
            while s_t not in self.Terminal_states:
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                #sampling action ~ ϵ-greedy action
                #1. make ϵ-greedy policy
                a_star = self.argmax_a_Q(Q,s_prime)
                for action,prob in π[s_prime].items():
                    if action == a_star:
                        π[s_prime][action] = 1 - epsilon + epsilon/action_space_size
                        π[s_prime][action] = epsilon/action_space_size
                #2. sampling action
                a_prime = self.sampling_action(s_prime,π)
                Q[(s_t,a_t)] = Q[(s_t,a_t)] + alpha*(r + 1*Q[(s_prime,a_prime)] - Q[(s_t,a_t)])
                #print(alpha*(r + 1*Q[(s_prime,a_prime)] - Q[(s_t,a_t)]))
                s_t = s_prime;a_t = a_prime
        return Q,π
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= np.exp(np.log(0.0001)/10000),ϵ = 0.9999)
rmse = difference(q_star,Q)
  • Once again, I see that the SARSA algorithm will not converge if the GLIE condition is not met.
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 10000= np.exp(np.log(0.0001)/100000),ϵ = 0.999)
rmse = difference(q_star,Q)
  • Once again, I see that the SARSA algorithm will not converge if the Robbins-Monro condition is not met.
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 100000= np.exp(np.log(0.0001)/100000),ϵ = 0.9999)
rmse = difference(q_star,Q)
  • Here, I see that the SARSA algorithm gradually converges to the GLIE and Robbins-monro conditions.
  • I’ve also seen that by increasing the number of iterations here, I can approach the optimal action value function and policy
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 1000000= np.exp(np.log(0.0001)/1000000),ϵ = 0.99999)
rmse = difference(q_star,Q)
  • It seems to have converged.
  • Insight
    1. The SARSA algorithm converges only when the GLIE and Robbins-monro conditions are satisfied.
    2. It is very slow to converge.


class Q_learning(Environment):
    def __init__(self,grid_size=(4,4)):
    def control(self,s_0,iter_num,alpha,epsilon):
        Q = {(tuple(s),a) : 0 for s in self.S for a in self.A}
        π = self.generate_π()
        γ = 1
        for ep_num in range(iter_num):
            ϵ = epsilon ** (ep_num + 1)
            α = alpha ** (ep_num + 1)
            if ep_num % (iter_num / 10) == True:
                print(f"epsilon : {ϵ} alpha : {α}")
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                Q[(s_t,a_t)] = Q[(s_t,a_t)] + α * (r + γ * Q[(s_prime,self.argmax_a_Q(Q,s_prime))] - Q[(s_t,a_t)])
                a_star = self.argmax_a_Q(Q,tuple(s_t))
                for (state,action),value in Q.items():
                    if action == a_star:
                        π[state][action] = 1 - ϵ + ϵ /len(self.A)
                        π[state][action] = ϵ/len(self.A)
                s_t = s_prime
        return π,Q
q_l = Q_learning()
π,Q= q_l.control(s_0 = (2,1),iter_num = 1000,alpha = np.exp(np.log(0.001)/1000),epsilon = 0.995)
rmse = difference(q_star,Q)
epsilon : 0.010874738754866503 alpha : 0.001967886289706926
q_l = Q_learning()
π,Q= q_l.control(s_0 = (2,1),iter_num = 10000,alpha = np.exp(np.log(0.001)/10000),epsilon = 0.9995)
rmse = difference(q_star,Q)
epsilon : 0.011085408054012442 alpha : 0.0019925076614966753
  • It seems to have converged very quickly and accurately!
  • Insight
    • Q-learning converges much faster than the SARSA algorithm.