OpenAI Gym oferuje środowiska retro gier z konsoli Atari na których można testować własnych agentów AI. Można przy tej okazji odbyć sentymentalną podróż w czasie do lat 80/90, czyli czasów ekscytacji pierwszymi komputerami domowymi i nocy spędzonych przed telewizorami wyświetlającymi prostą grafikę z wszelkiego rodzaju 8 bitowców.
Na początek klasyk wśród klasyków, Pong. Gra ta została wybrana w celu testowania samego algorytmu ale nie należy jej tu traktować jako przypadek szczególny. W moim głównym założeniu, ten sam algorytm, bez wykonywania znaczących modyfikacji, ma również radzić sobie z pozostałymi grami.
Dla każdej z gier w OpenAI Gym mamy dwie wersje środowisk, pierwsza jako obserwację daje nam stan pamięci konsoli, druga obraz generowany przez konsolę na ekran. Wybrałem tą drugą, tak jakby grający agent miał oko i był bardziej „ludzki”. Środowisko wygląda następująco:
import gym.spaces env = gym.make('Pong-v0') print(env.observation_space) print(env.action_space) print(env.unwrapped.get_action_meanings()) Box(210, 160, 3) Discrete(6) ['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']
Agent obserwuje to co się dzieje na ekranie otrzymując trójkolorowy obraz RGB w rozdzielczości 210×160. Jako działanie może podjąć jedna z 6 akcji dostępnych dla kontrolera konsoli.
Drugim wprowadzonym przeze mnie antropomorfizmem (animizacją?), który będzie cechował mojego agenta jest pamięć ruchu. Obserwując to co dzieje się w grze agent widzi nie tylko stan bieżący ale także zmiany obrazu, które nastąpiły zdefiniowana liczbę kroków wcześniej. Niczym człowiek lub zwierze, które w obserwowanym otoczeniu, naturalnie wyłapuje w pierwszej kolejności te elementy, które się poruszają lub zmieniają kolor. Przykładowy przekazywany do agenta obraz po takiej modyfikacji może wyglądać jak poniżej:
Tak przygotowany obraz przepuszczamy jest przez 3 warstwy konwolucyjne, i jedną „płaską” by na wyjściu podjąć wybór (Softmax) jednego z 6 możliwych działań dla kontrolera. Do oceny bieżących poczynań agenta używamy zwracanej przez środowisko nagrody, która w tym wypadku po prostu wynosi 1 dla każdego zdobytego punktu. W przypadku uzyskania nagrody, zapamiętujemy wszystkie poprzedzające obserwacje i akcje jakie przy nich zostały podjęte, które pozwoliły zdobyć agentowi punkt w grze. Gra kończy się w momencie kiedy dowolna ze stron uzyska wynik 21 punktów. Przez pierwsze 200 gier poruszamy się zupełnie losowo co pozwala nam zdobyć w sumie około 150 punktów uzyskanych po około 10k-12k akcjach. Tak baza obserwacji i akcji będzie podstawą do pierwszego treningu sieci. Dane treningowe przepuszczamy przez sieć 10 razy (epochs). W kolejnych próbach poruszamy się już zgodnie z tym co podpowiada nam nauczona tak sieć. Dalej co 10 grę trenujemy sieć ponownie, nowymi przypadkami w których zdobyliśmy punkt. Do ogólnej oceny działania algorytmu używamy średnią z uzyskanego wyniku własnego w ostatnich 100 grach.
Przy sieci zdefiniowanej bez warstw Pooling i Dropout (ponad 120M wag) obliczenia trwają dość długo ale już po około 1200 grach uzyskujemy średni wynik ze 100 gier powyżej 20 punktów. Po 1800 grach agent gra całkiem dobrze i ma wypracowaną własną strategię na osiągnięcie sukcesu.
Kod programu:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Dec 24 21:17:54 2018 @author: majcher.net """ import argparse import os.path import numpy as np import gym.spaces import time import matplotlib.pyplot as plt import random from PIL import Image #move randomly and explore in the first n games random_stage=200 #try to play the game NP times NP=2000 #use last m frames to detect motion m=5 #be verbose verbose=1 frame_x = 96 frame_y = 96 parser = argparse.ArgumentParser(description='Agent playing atari pong game.') parser.add_argument('-l', dest='filename', help='filename of keras model', required=False) parser.add_argument('-vo', dest='video_output_dir', help='write video output to directory', required=False) parser.add_argument('-r', help='render environment', dest='render', action='store_true', required=False, default=False) args = vars(parser.parse_args()) keras_model_file = args['filename'] video_output_dir = args['video_output_dir'] def get_counter(fname="counter.txt"): """ Reads from file and return the experiment counter, also incerease it and save. """ if os.path.exists(fname): fcounter = open(fname,"r+") counter = fcounter.read() counter = int(counter) fcounter.seek(0) fcounter.write("{}".format(counter+1)) else: fcounter = open(fname,"a+") counter = 1 fcounter.write("{}".format(counter+1)) fcounter.truncate() fcounter.close() return counter def onehot(i,n): """ Create and return n elements vector with all 0 and 1 on ith position. """ vec = np.zeros(n) vec[i] = 1. return vec def rgb2bw(image): """ Make and return BW image form RGB got as numpy array. Greyscale of BW image is 0-51 """ return np.mean(image/5, axis=2).astype(np.uint8) def reduceimg(image): """ Resize image to farme_x,frame_y size. """ return np.array(Image.fromarray(image).resize((frame_x, frame_y))) def movement_trail (curr_frame, prev_frame, prev_trail, frames): """ Make image of movement trail curr_frame - image of current environment observation prev_frame - image of previous environment observation prev_trail - image of previous movement trail m - how many frames to remember """ trail = np.add(prev_trail, np.where(curr_frame != prev_frame, frames, 0)) trail[trail > frames] = frames trail = trail - 1 trail[trail < 0] = 0 return trail #set experiment run counter counter = get_counter() #select Pong environment if video_output_dir: env_wrapper = gym.make('Pong-v0') env = gym.wrappers.Monitor(env_wrapper, video_output_dir, force = True) else: env = gym.make('Pong-v0') n_actions = len(env.unwrapped.get_action_meanings()) scores = {'C': [], 'A': []} #define convolutional neural network from tensorflow.python.keras.models import Sequential, load_model from tensorflow.python.keras.layers import Input, Dense, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, Dropout, Activation if keras_model_file: nn_model = load_model(keras_model_file) init_model = False else: nn_model = Sequential() nn_model.add(Conv2D(32, (6, 6), strides=(1, 1), padding='valid', data_format="channels_last", input_shape=(frame_x,frame_y,1))) nn_model.add(Activation('relu')) nn_model.add(AveragePooling2D(pool_size=(2, 2))) nn_model.add(Dropout(0.1)) nn_model.add(Conv2D(64, (4, 4), strides=(1, 1), padding='valid')) nn_model.add(Activation('relu')) nn_model.add(AveragePooling2D(pool_size=(2, 2))) nn_model.add(Dropout(0.1)) nn_model.add(Conv2D(128, (2, 2), strides=(1, 1), padding='valid')) nn_model.add(Activation('relu')) #nn_model.add(MaxPooling2D(pool_size=(2, 2))) #nn_model.add(Dropout(0.1)) nn_model.add(Flatten()) nn_model.add(Dense(256)) nn_model.add(Activation('relu')) nn_model.add(Dense(n_actions)) nn_model.add(Activation('softmax')) nn_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) init_model = True if verbose: nn_model.summary() all_train_X = np.zeros((0,frame_x,frame_y,1)) all_train_Y = np.zeros((0,n_actions)) R = range(NP) if init_model else range(random_stage+1,NP) for p in R: env.reset() observation, reward, done, info = env.step(0) observation = rgb2bw(reduceimg(observation)) prev_observation = observation motion = np.zeros(shape=prev_observation.shape) train_X = np.zeros((0,frame_x,frame_y,1)) train_Y = np.zeros((0,n_actions)) done = False total_reward = 0 computer_score = 0 if p < random_stage: eve = 1. else: eve = 0. if not p % 100 and p > random_stage: nn_model.save('run_{}_model_step_{}.keras'.format(counter,p)) while(not done): if args['render']: env.render() #make movement trail motion = movement_trail(observation, prev_observation, motion, m) #combaine current observation of environment and movement trail to make input for CNN frame_to_learn = observation + motion * 50 #normalize data x = np.array([frame_to_learn/255]).reshape(1,frame_x,frame_y,1) #choose explore or get best trained action and exploite if eve > np.random.random(): action = env.action_space.sample() else: pred = nn_model.predict(x) action = np.random.choice(n_actions, p=pred[0]) #remember environment state and performed action train_X = np.append(train_X, x, axis=0) train_Y = np.append(train_Y, onehot(action,n_actions).reshape(1,n_actions), axis=0) prev_observation = observation observation, reward, done, info = env.step(action) observation = rgb2bw(reduceimg(observation)) if reward > 0: #if scored a point, remrmber all states and moves to train later total_reward += 1 if verbose: print ("{}/{}\tAgent scored a point, remember to learn {} moves.".format(computer_score,total_reward,len(train_Y))) all_train_X = np.append(all_train_X, train_X, axis=0) all_train_Y = np.append(all_train_Y, train_Y, axis=0) train_X = np.zeros((0,frame_x,frame_y,1)) train_Y = np.zeros((0,n_actions)) elif reward < 0: computer_score += 1 if verbose: print ("{}/{}\tAgent lost a point.".format(computer_score,total_reward)) train_X = np.zeros((0,frame_x,frame_y,1)) train_Y = np.zeros((0,n_actions)) if p == random_stage: epochs = 10 batch_size = 100 nn_model.fit(x=all_train_X, y=all_train_Y,batch_size=batch_size,epochs=epochs,verbose=verbose) all_train_X = np.zeros((0,frame_x,frame_y,1)) all_train_Y = np.zeros((0,n_actions)) elif p > random_stage and not p % 10: if random_stage < p <= random_stage + 100: epochs = 4 batch_size = 100 elif random_stage + 100 < p <= random_stage + 200: epochs = 3 batch_size = 100 elif random_stage + 200 < p <= random_stage + 300: epochs = 2 batch_size = 100 nn_model.fit(x=all_train_X, y=all_train_Y,batch_size=batch_size,epochs=epochs,verbose=verbose) all_train_X = np.zeros((0,frame_x,frame_y,1)) all_train_Y = np.zeros((0,n_actions)) scores['A'].append(total_reward) scores['C'].append(computer_score) print ("Game {}-{}, eve rate {:3.2f}, result {}/{}, last 100 games: mean score {:4.2f}/{:4.2f}, wins {}/{}" .format(counter,p,eve,computer_score,total_reward, np.mean(scores['C'][-100:]),np.mean(scores['A'][-100:]),scores['C'][-100:].count(21),scores['A'][-100:].count(21))) env.close() if env_wrapper: env_wrapper.close()