-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtrain_bot.py
130 lines (109 loc) · 5.39 KB
/
train_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import datetime
import gzip
import os
import sys
import copy
import numpy as np
import json
from keras.models import Sequential,load_model
from keras.layers import Dense
from keras.layers.advanced_activations import LeakyReLU
from keras.callbacks import EarlyStopping,ModelCheckpoint,TensorBoard
REPLAY_FOLDER = sys.argv[1]
training_input = []
training_target = []
VISIBLE_DISTANCE = 4
input_dim=4*(2*VISIBLE_DISTANCE+1)*(2*VISIBLE_DISTANCE+1)
np.random.seed(0) # for reproducibility
model = Sequential([Dense(512, input_dim=input_dim),
LeakyReLU(),
Dense(512),
LeakyReLU(),
Dense(512),
LeakyReLU(),
Dense(5, activation='softmax')])
model.compile('nadam','categorical_crossentropy', metrics=['accuracy'])
def stack_to_input(stack, position):
return np.take(np.take(stack,
np.arange(-VISIBLE_DISTANCE,VISIBLE_DISTANCE + 1)+position[0],axis=1,mode='wrap'),
np.arange(-VISIBLE_DISTANCE,VISIBLE_DISTANCE + 1)+position[1],axis=2,mode='wrap').flatten()
size = len(os.listdir(REPLAY_FOLDER))
for index, replay_name in enumerate(os.listdir(REPLAY_FOLDER)):
if replay_name[-4:]!='.hlt':continue
print('Loading {} ({}/{})'.format(replay_name, index, size))
try:
replay = json.load(open('{}/{}'.format(REPLAY_FOLDER,replay_name)))
except AttributeError:
f = gzip.open('{}/{}'.format(REPLAY_FOLDER,replay_name), 'r')
file_content = f.read().decode('utf-8')
replay = json.loads(file_content)
frames=np.array(replay['frames'])
player=frames[:,:,:,0]
players,counts = np.unique(player[-1],return_counts=True)
target_id = players[counts.argmax()]
if target_id == 0: continue
prod = np.repeat(np.array(replay['productions'])[np.newaxis],replay['num_frames'],axis=0)
strength = frames[:,:,:,1]
ns_flip = copy.deepcopy(replay['moves'])
moves_rotations = [ns_flip]
for flip_moves in moves_rotations:
moves = (np.arange(5) == np.array(flip_moves)[:, :, :, None]).astype(int)[:128]
stacks = np.array([player==target_id,(player != target_id) & (player!=0),prod/20,strength/255])
stacks = stacks.transpose(1,0,2,3)[:len(moves)].astype(np.float32)
position_indices = stacks[:,0].nonzero()
sampling_rate = 1/stacks[:,0].mean(axis=(1, 2))[position_indices[0]]
sampling_rate *= moves[position_indices].dot(np.array([1,15,15,15,15])) # weight moves 10 times higher than still
sampling_rate /= sampling_rate.sum()
sample_indices = np.transpose(position_indices)[np.random.choice(np.arange(len(sampling_rate)),
min(len(sampling_rate),2048),p=sampling_rate,replace=False)]
replay_input = np.array([stack_to_input(stacks[i],[j,k]) for i,j,k in sample_indices])
replay_target = moves[tuple(sample_indices.T)]
training_input.append(replay_input.astype(np.float32))
training_target.append(replay_target.astype(np.float32))
now = datetime.datetime.now()
tensorboard = TensorBoard(log_dir='./logs/'+now.strftime('%Y.%m.%d %H.%M'))
training_input = np.concatenate(training_input,axis=0)
training_target = np.concatenate(training_target,axis=0)
indices = np.arange(len(training_input))
np.random.shuffle(indices) # shuffle training samples
training_input = training_input[indices]
training_target = training_target[indices]
model.fit(training_input,training_target,validation_split=0.2,
callbacks=[EarlyStopping(patience=10),
ModelCheckpoint('model-test.h5',verbose=1,save_best_only=True),
tensorboard],
batch_size=1024, nb_epoch=1000)
model = load_model('model-test.h5')
still_mask = training_target[:,0].astype(bool)
print('STILL accuracy:',model.evaluate(training_input[still_mask],training_target[still_mask],verbose=0)[1])
print('MOVE accuracy:',model.evaluate(training_input[~still_mask],training_target[~still_mask],verbose=0)[1])
# # Flip along NS and EW to get 4 configurations
# ns_flip = copy.deepcopy(replay['moves'])
# for i in range(len(replay['moves'])): # For each frame
# for j in range(len(replay['moves'][0])): # For each row along height
# for k in range(len(replay['moves'][0][0])): # For each move along row
# if ns_flip[i][j][k] == 1:
# ns_flip[i][j][k] = 3
# elif ns_flip[i][j][k] == 3:
# ns_flip[i][j][k] = 1
# moves_rotations.append(ns_flip)
#
# ew_flip = copy.deepcopy(replay['moves'])
# for i in range(len(replay['moves'])): # For each frame
# for j in range(len(replay['moves'][0])): # For each row along height
# for k in range(len(replay['moves'][0][0])): # For each move along row
# if ew_flip[i][j][k] == 2:
# ew_flip[i][j][k] = 4
# elif ew_flip[i][j][k] == 2:
# ew_flip[i][j][k] = 4
# moves_rotations.append(ew_flip)
#
# ns_ew_flip = copy.deepcopy(ns_flip)
# for i in range(len(replay['moves'])): # For each frame
# for j in range(len(replay['moves'][0])): # For each row along height
# for k in range(len(replay['moves'][0][0])): # For each move along row
# if ns_ew_flip[i][j][k] == 2:
# ns_ew_flip[i][j][k] = 4
# elif ns_ew_flip[i][j][k] == 2:
# ns_ew_flip[i][j][k] = 4
# moves_rotations.append(ns_ew_flip)