Skip to content

Sztuczna inteligencja gra na Atari

OpenAI Gym oferuje środowiska retro gier z konsoli Atari na których można testować własnych agentów AI. A przy tej okazji można też odbyć sentymentalną podróż do lat 80/90, czyli czasów ekscytacji pierwszymi komputerami domowymi i nocy spędzonych przed telewizorami wyświetlającymi prostą grafikę z wszelkiego rodzaju 8 bitowców.

Na początek wrzucam na warsztat klasyk wśród klasyków, Pong. Gra ta została wybrana w celu testowania samego algorytmu ale nie należy jej tu traktować jako przypadek szczególny. W moim głównym założeniu, ten sam algorytm, bez wykonywania dodatkowych modyfikacji, ma również radzić sobie z pozostałymi grami.

Dla każdej z gier w OpenAI Gym mamy dwie wersje środowisk, pierwsza jako obserwację daje nam stan pamięci konsoli, druga obraz generowany przez konsolę na ekran. Wybrałem tą drugą, tak jakby grający agent miał oko i był bardziej „ludzki”. Środowisko wygląda następująco:

import gym.spaces
env = gym.make('Pong-v0')

print(env.observation_space)
print(env.action_space)
print(env.unwrapped.get_action_meanings())
Box(210, 160, 3)
Discrete(6)
['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']

Agent obserwuje to co się dzieje na ekranie otrzymując trójkolorowy obraz RGB w rozdzielczości 210×160. Jako działanie może podjąć jedna z 6 akcji dostępnych dla kontrolera konsoli.

Drugim antropomorfizmem lub też animizacją, które cechuje mojego agenta jest pamięć ruchu. Obserwując to co dzieje się w grze agent widzi nie tylko stan bieżący ale także zmiany, które nastąpiły zdefiniowana liczbę kroków wcześniej. Niczym człowiek lub zwierze, które w obserwowanym otoczeniu, naturalnie wyłapuje w pierwszej kolejności te elementy, które się poruszają. Obraz przekazywany do agenta wyglądać może jak poniżej:


Tak otrzymany obraz przepuszczamy jest przez 3 warstwy konwolucyjne, i jedną „płaską” by na wyjściu podjąć wybór (Softmax) jednego z 6 możliwych działań dla kontrolera. Do oceny bieżących poczynań agenta używamy zwracanej przez środowisko nagrody, która w tym wypadku po prostu wynosi 1 dla każdego zdobytego punktu. W przypadku uzyskania nagrody, zapamiętujemy wszystkie poprzedzające obserwacje i akcje jakie przy nich zostały podjęte, które pozwoliły zdobyć agentowi punkt w grze. Gra kończy się w momencie kiedy dowolna ze stron uzyska wynik 21 punktów. Przez pierwsze 200 gier poruszamy się zupełnie losowo co pozwala nam zdobyć w sumie około 150 punktów uzyskanych po około 10k-12k akcjach. Tak baza obserwacji i akcji będzie podstawą do pierwszego treningu sieci. Dane treningowe przepuszczamy przez sieć 10 razy (epochs). W kolejnych próbach poruszamy się już zgodnie z tym co podpowiada nam nauczona tak sieć. Dalej co 10 grę trenujemy sieć ponownie, nowymi przypadkami w których zdobyliśmy punkt. Do ogólnej oceny działania algorytmu używamy średnią z uzyskanego wyniku własnego w ostatnich 100 grach.

Przy sieci zdefiniowanej bez warstw Pooling i Dropout (ponad 120M wag) obliczenia trwają dość długo ale już po około 1200 grach uzyskujemy średni wynik ze 100 gier powyżej 20 punktów. Po 1800 grach agent gra całkiem dobrze i ma wypracowaną własną strategię na osiągnięcie sukcesu.

Kod programu:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Dec 24 21:17:54 2018

@author: majcher.net
"""
import argparse
import os.path
import numpy as np
import gym.spaces
import time
import matplotlib.pyplot as plt
import random
from PIL import Image

#move randomly and explore in the first n games
random_stage=200

#try to play the game NP times
NP=2000

#use last m frames to detect motion
m=5

#be verbose
verbose=1

frame_x = 96
frame_y = 96

parser = argparse.ArgumentParser(description='Agent playing atari pong game.')
parser.add_argument('-l', dest='filename', help='filename of keras model', required=False)
parser.add_argument('-vo', dest='video_output_dir', help='write video output to directory', required=False)
parser.add_argument('-r', help='render environment', dest='render',
                    action='store_true', required=False, default=False)
args = vars(parser.parse_args())

keras_model_file = args['filename']
video_output_dir = args['video_output_dir']

def get_counter(fname="counter.txt"):
    """
    Reads from file and return the experiment counter, also incerease it and save.
    """
    if os.path.exists(fname):
        fcounter = open(fname,"r+")
        counter = fcounter.read()
        counter = int(counter)
        fcounter.seek(0)
        fcounter.write("{}".format(counter+1))
    else:
        fcounter = open(fname,"a+")
        counter = 1
        fcounter.write("{}".format(counter+1))
        
    fcounter.truncate()
    fcounter.close()
    return counter

def onehot(i,n):
    """
    Create and return n elements vector with all 0 and 1 on ith position.
    """
    vec = np.zeros(n)
    vec[i] = 1.
    return vec

def rgb2bw(image):
    """
    Make and return  BW image form RGB got as numpy array. 
    Greyscale of BW image is 0-51
    """
    return np.mean(image/5, axis=2).astype(np.uint8)      
    
def reduceimg(image):
    """
    Resize image to farme_x,frame_y size.
    """
    return np.array(Image.fromarray(image).resize((frame_x, frame_y)))

def movement_trail (curr_frame, prev_frame, prev_trail, frames):
    """
    Make image of movement trail
    
    curr_frame - image of current environment observation
    prev_frame - image of previous environment observation
    prev_trail - image of previous movement trail
    m          - how many frames to remember
    """
    trail = np.add(prev_trail, np.where(curr_frame != prev_frame, frames, 0))
    trail[trail > frames] = frames
    trail = trail - 1
    trail[trail < 0] = 0
    return trail

#set experiment run counter
counter = get_counter()

#select Pong environment
if video_output_dir:
    env_wrapper = gym.make('Pong-v0')
    env = gym.wrappers.Monitor(env_wrapper, video_output_dir, force = True)
else:
    env = gym.make('Pong-v0')

n_actions = len(env.unwrapped.get_action_meanings())
scores = {'C': [], 'A': []}

#define convolutional neural network
from tensorflow.python.keras.models import Sequential, load_model
from tensorflow.python.keras.layers import Input, Dense, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, Dropout, Activation


if keras_model_file:
    nn_model = load_model(keras_model_file)
    init_model = False
else:
    nn_model = Sequential()

    nn_model.add(Conv2D(32, (6, 6), strides=(1, 1), padding='valid', data_format="channels_last", input_shape=(frame_x,frame_y,1)))
    nn_model.add(Activation('relu'))
    nn_model.add(AveragePooling2D(pool_size=(2, 2)))
    nn_model.add(Dropout(0.1))

    nn_model.add(Conv2D(64, (4, 4), strides=(1, 1), padding='valid'))
    nn_model.add(Activation('relu'))
    nn_model.add(AveragePooling2D(pool_size=(2, 2)))
    nn_model.add(Dropout(0.1))

    nn_model.add(Conv2D(128, (2, 2), strides=(1, 1), padding='valid'))
    nn_model.add(Activation('relu'))
    #nn_model.add(MaxPooling2D(pool_size=(2, 2)))
    #nn_model.add(Dropout(0.1))

    nn_model.add(Flatten())
    nn_model.add(Dense(256))
    nn_model.add(Activation('relu'))
    nn_model.add(Dense(n_actions))
    nn_model.add(Activation('softmax'))

    nn_model.compile(optimizer='adam',
                     loss='categorical_crossentropy',
                     metrics=['accuracy'])
    init_model = True

if verbose:
    nn_model.summary()

all_train_X = np.zeros((0,frame_x,frame_y,1))
all_train_Y = np.zeros((0,n_actions))

R = range(NP) if init_model else range(random_stage+1,NP)

for p in R:
    env.reset()
    
    observation, reward, done, info = env.step(0)
    observation = rgb2bw(reduceimg(observation))
    
    prev_observation = observation
    motion = np.zeros(shape=prev_observation.shape)
    train_X = np.zeros((0,frame_x,frame_y,1))
    train_Y = np.zeros((0,n_actions))
   
    done = False
    total_reward = 0
    computer_score = 0

    if p < random_stage:
        eve = 1.
    else:
        eve = 0.

    if not p % 100 and p > random_stage:
        nn_model.save('run_{}_model_step_{}.keras'.format(counter,p))
    
    
    while(not done):
        if args['render']:
            env.render()
            
        #make movement trail
        motion = movement_trail(observation, prev_observation, motion, m)
        
        #combaine current observation of environment and movement trail to make input for CNN
        frame_to_learn = observation + motion * 50
        
        #normalize data
        x = np.array([frame_to_learn/255]).reshape(1,frame_x,frame_y,1)
        
        #choose explore or get best trained action and exploite
        if eve > np.random.random():
            action = env.action_space.sample()
        else:
            pred = nn_model.predict(x)
            action = np.random.choice(n_actions, p=pred[0])
        
        #remember environment state and performed action
        train_X = np.append(train_X, x, axis=0)
        train_Y = np.append(train_Y, onehot(action,n_actions).reshape(1,n_actions), axis=0)
        
        prev_observation = observation
        observation, reward, done, info = env.step(action)
        observation = rgb2bw(reduceimg(observation))
        
        if reward > 0:
            #if scored a point, remrmber all states and moves to train later
            total_reward += 1
            if verbose:
                print ("{}/{}\tAgent scored a point, remember to learn {} moves.".format(computer_score,total_reward,len(train_Y)))
            all_train_X = np.append(all_train_X, train_X, axis=0)
            all_train_Y = np.append(all_train_Y, train_Y, axis=0)
            train_X = np.zeros((0,frame_x,frame_y,1))
            train_Y = np.zeros((0,n_actions))

        elif reward < 0:
            computer_score += 1
            if verbose:
                print ("{}/{}\tAgent lost a point.".format(computer_score,total_reward))
            train_X = np.zeros((0,frame_x,frame_y,1))
            train_Y = np.zeros((0,n_actions))
        

    if p == random_stage:
        epochs = 10
        batch_size = 100
    
        nn_model.fit(x=all_train_X, y=all_train_Y,batch_size=batch_size,epochs=epochs,verbose=verbose)
        all_train_X = np.zeros((0,frame_x,frame_y,1))
        all_train_Y = np.zeros((0,n_actions))
    
    elif p > random_stage and not p % 10:
        
        if random_stage < p <= random_stage + 100:
            epochs = 4
            batch_size = 100
        
        elif random_stage + 100 < p <= random_stage + 200:
            epochs = 3
            batch_size = 100
        
        elif random_stage + 200 < p <= random_stage + 300:
            epochs = 2
            batch_size = 100

        nn_model.fit(x=all_train_X, y=all_train_Y,batch_size=batch_size,epochs=epochs,verbose=verbose)
        all_train_X = np.zeros((0,frame_x,frame_y,1))
        all_train_Y = np.zeros((0,n_actions))

    scores['A'].append(total_reward)
    scores['C'].append(computer_score)
    
    print ("Game {}-{}, eve rate {:3.2f}, result {}/{}, last 100 games: mean score {:4.2f}/{:4.2f}, wins {}/{}"
           .format(counter,p,eve,computer_score,total_reward,
                   np.mean(scores['C'][-100:]),np.mean(scores['A'][-100:]),scores['C'][-100:].count(21),scores['A'][-100:].count(21)))


env.close()

if env_wrapper:
    env_wrapper.close()

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *