L'article précédent était ici. Créez le code expérimental CNN en ajoutant ou en modifiant le code expérimental créé dans ici. L'expérience utilise l'ensemble de données MNIST de scicit-learn en raison du problème de temps d'exécution. La différence avec le jeu de données MNIST normal
C'est. Grâce à cela, le temps d'apprentissage n'est que de quelques dizaines de secondes (dans mon environnement). Pour le moment, le code expérimental de l'ensemble de données Keras complet est également inclus. Cela semble prendre plusieurs heures dans mon environnement, alors j'ai abandonné ...
Trainer
--Modification de l'implémentation de la fonction forward
de la classe Trainer
. Approximativement, lorsque vous essayez de diffuser des données de 10 Mo ou plus à la fois, elles sont divisées et diffusées en continu._TypeManager
](Changer la classe #_typemanager)Trainer
](Ajouter la classe #trainer)LayerManager
](Changer la classe #layermanager)_TypeManager
Tout d'abord, ajoutez-le à la classe _TypeManager
afin que la classe LayerManager
puisse gérer ConvLayer
et PoolingLayer
.
_type_manager.py
class _TypeManager():
"""
Classe de gestionnaire pour les types de couches
"""
N_TYPE = 4 #Nombre de types de couches
BASE = -1
MIDDLE = 0 #Numérotation des couches intermédiaires
OUTPUT = 1 #Numérotation des couches de sortie
CONV = 2 #Numérotation des couches de pliage
POOL = 3 #Numérotation de la couche de pooling
REGULATED_DIC = {"Middle": MiddleLayer,
"Output": OutputLayer,
"Conv": ConvLayer,
"Pool": PoolingLayer,
"BaseLayer": None}
@property
def reg_keys(self):
return list(self.REGULATED_DIC.keys())
def name_rule(self, name):
name = name.lower()
if "middle" in name or name == "mid" or name == "m":
name = self.reg_keys[self.MIDDLE]
elif "output" in name or name == "out" or name == "o":
name = self.reg_keys[self.OUTPUT]
elif "conv" in name or name == "c":
name = self.reg_keys[self.CONV]
elif "pool" in name or name == "p":
name = self.reg_keys[self.POOL]
else:
raise UndefinedLayerError(name)
return name
«CONV» et «POOL» sont ajoutés en tant que constantes, et «REGULATED_DIC» est utilisé pour obtenir l'objet de calque à partir du nom du calque.
De plus, comme il y avait de nombreuses occasions où la liste clés
de REGURATED_DIC
était nécessaire, nous avons ajouté une couche de convolution et une couche de regroupement aux règles de propriété et de dénomination.
Les fonctions d'apprentissage et de prédiction sont séparées de la classe LayerManager
en tant que classe Trainer
.
trainer.py
import time
import numpy as np
softmax = type(get_act("softmax"))
sigmoid = type(get_act("sigmoid"))
class Trainer():
def __init__(self, x, y):
self.x_train, self.x_test = x
self.y_train, self.y_test = y
self.make_anim = False
def forward(self, x, lim_memory=10):
def propagate(x):
x_in = x
n_batch = x.shape[0]
switch = True
for ll in self.layer_list:
if switch and not self.is_CNN(ll.name):
x_in = x_in.reshape(n_batch, -1)
switch = False
x_in = ll.forward(x_in)
#Parce que la méthode de propagation directe est également utilisée pour le calcul d'erreur et la prédiction de données inconnues
#La capacité de mémoire peut être importante
if np.prod(x.shape)*8/2**20 >= 10:
#Nombre à virgule flottante double précision(8byte)À 10 Mo(=10*2**20)Plus que
#Lorsque vous utilisez de la mémoire, divisez-la en 5 Mo ou moins et exécutez
n_batch = int(5*2**20/(8*np.prod(x.shape[1:])))
y = np.zeros((x.shape[0], lm[-1].n))
n_loop = int(np.ceil(x.shape[0]/n_batch))
for i in range(n_loop):
propagate(x[i*n_batch : (i+1)*n_batch])
y[i*n_batch : (i+1)*n_batch] = lm[-1].y.copy()
lm[-1].y = y
else:
#Sinon, exécutez normalement
propagate(x)
def backward(self, t):
y_in = t
n_batch = t.shape[0]
switch = True
for ll in self.layer_list[::-1]:
if switch and self.is_CNN(ll.name):
y_in = y_in.reshape(n_batch, *ll.O_shape)
switch = False
y_in = ll.backward(y_in)
def update(self, **kwds):
for ll in self.layer_list:
ll.update(**kwds)
def training(self, epoch, n_batch=16, threshold=1e-8,
show_error=True, show_train_error=False, **kwds):
if show_error:
self.error_list = []
if show_train_error:
self.train_error_list = []
if self.make_anim:
self.images = []
self.n_batch = n_batch
n_train = self.x_train.shape[0]//n_batch
n_test = self.x_test.shape[0]
#Commencer à apprendre
start_time = time.time()
lap_time = -1
error = 0
error_prev = 0
rand_index = np.arange(self.x_train.shape[0])
for t in range(1, epoch+1):
#Création de scène
if self.make_anim:
self.make_scene(t, epoch)
#Calcul des erreurs d'entraînement
if show_train_error:
self.forward(self.x_train)
error = lm[-1].get_error(self.y_train)
self.train_error_list.append(error)
#Calcul d'erreur
self.forward(self.x_test)
error = lm[-1].get_error(self.y_test)
if show_error:
self.error_list.append(error)
#Jugement de convergence
if np.isnan(error):
print("fail training...")
break
if abs(error - error_prev) < threshold:
print("end learning...")
break
else:
error_prev = error
t_percent = int(50*t/epoch)
np.random.shuffle(rand_index)
for i in range(n_train):
i_percent = int(50*(i+1)/n_train)
if i_percent <= t_percent:
time_stamp = ("progress:[" + "X"*i_percent
+ "\\"*(t_percent-i_percent)
+ " "*(50-t_percent) + "]")
else:
time_stamp = ("progress:[" + "X"*t_percent
+ "/"*(i_percent-t_percent)
+ " "*(50-i_percent) + "]")
elapsed_time = time.time() - start_time
print("\r" + time_stamp
+ "{}s/{}s".format(
int(elapsed_time),
int(lap_time*epoch) if lap_time > 0 else "?"),
end="")
rand = rand_index[i*n_batch : (i+1)*n_batch]
self.forward(self.x_train[rand])
self.backward(self.y_train[rand])
self.update(**kwds)
if lap_time < 0:
lap_time = time.time() - start_time
print()
if show_error:
#Affichage de transition d'erreur
self.show_errors(show_train_error, **kwds)
def pred_func(self, y, threshold=0.5):
if isinstance(self[-1].act, softmax):
return np.argmax(y, axis=1)
elif isinstance(self[-1].act, sigmoid):
return np.where(y > threshold, 1, 0)
else:
raise NotImplemented
def predict(self, x=None, y=None, threshold=0.5):
if x is None:
x = self.x_test
if y is None:
y = self.y_test
self.forward(x)
self.y_pred = self.pred_func(self[-1].y, threshold=threshold)
y = self.pred_func(y, threshold=threshold)
print("correct:", y[:min(16, int(y.shape[0]*0.1))])
print("predict:", self.y_pred[:min(16, int(y.shape[0]*0.1))])
print("accuracy rate:", np.sum(self.y_pred == y, dtype=int)/y.shape[0]*100, "%",
"({}/{})".format(np.sum(self.y_pred == y, dtype=int), y.shape[0]))
return self.y_pred
def show_errors(self, show_train_error=False, title="error transition",
xlabel="epoch", ylabel="error", fname="error_transition.png ",
log_scale=True, **kwds):
fig, ax = plt.subplots(1)
fig.suptitle(title)
if log_scale:
ax.set_yscale("log")
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.grid()
if show_train_error:
ax.plot(self.train_error_list, label="train accuracy")
ax.plot(self.error_list, label="test accuracy")
ax.legend(loc="best")
#fig.show()
if len(fname) != 0:
fig.savefig(fname)
def ready_anim(self, n_image, x, y, title="animation",
xlabel="x", ylabel="y", ex_color="r", color="b",
x_left=0, x_right=0, y_down = 1, y_up = 1):
self.n_image = n_image
self.x = x
self.color = color
self.make_anim = True
self.anim_fig, self.anim_ax = plt.subplots(1)
self.anim_fig.suptitle(title)
self.anim_ax.set_xlabel(xlabel)
self.anim_ax.set_ylabel(ylabel)
self.anim_ax.set_xlim(np.min(x) - x_left, np.max(x) + x_right)
self.anim_ax.set_ylim(np.min(y) - y_down, np.max(y) + y_up)
self.anim_ax.grid()
self.anim_ax.plot(x, y, color=ex_color)
return self.anim_fig, self.anim_ax
def make_scene(self, t, epoch):
#Création de scène
if t % (epoch/self.n_image) == 1:
x_in = self.x.reshape(-1, 1)
for ll in self.layer_list:
x_in = ll.forward(x_in)
im, = self.anim_ax.plot(self.x, ll.y, color=self.color)
self.images.append([im])
La raison pour laquelle les fonctions forward
, backward
et ʻupdatesont séparées en tant que fonctions est que si vous voulez faire quelque chose d'original, vous pouvez simplement lancer la méthode que vous voulez que la fonction
forward` fasse dans la propagation avant. Faire. Je pense qu'il y a un peu plus de place pour l'ingéniosité ...
De plus, étant donné que la fonction «avant» est également utilisée pour le calcul d'erreur et le calcul de prédiction, une énorme quantité de données peut circuler. Par conséquent, en supposant que les données de nombre à virgule flottante double précision (8 octets) ont circulé, si elles dépassent les 10 Mo estimés, elles ont été modifiées pour les diviser en environ 5 Mo et les transférer.
La fonction training
décrit le flux d'apprentissage. Je l'ai ajouté parce que je pensais que la transition d'erreur des données d'entraînement était également similaire.
De plus, le jugement de «NaN» est également inclus dans le jugement de convergence, et l'apprentissage est terminé immédiatement si l'apprentissage échoue.
Aussi, jusqu'à présent, la progression était affichée en utilisant le module tqdm
, mais je l'ai préparé moi-même.
"" Affiche la progression de l'époque et "/" indique l'état de digestion du lot.
La fonction «prédire» fait littéralement des prédictions pour les données de test. Des arguments facultatifs sont utilisés et, s'ils ne sont pas spécifiés, les données de test détenues par le gestionnaire de couches seront utilisées.
Après avoir transmis les données de test, le format des données est modifié par pred_func
et le taux de réponse correct est calculé. Il semble que nous devons changer un peu ici aussi ... Cela ne donnera que le taux de réponse correcte pour les questions de classification ...
LayerManager
Avec l'ajout des classes ConvLayer
et Pooling
, des modifications mineures ont été nécessaires.
layer_manager.py
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import tqdm
class LayerManager(_TypeManager, Trainer):
"""
Classe Manager pour la gestion des couches
"""
def __init__(self, x, y):
super().__init__(x, y)
self.__layer_list = [] #Liste des couches
self.__name_list = [] #Liste de noms pour chaque couche
self.__ntype = np.zeros(self.N_TYPE, dtype=int) #Nombre de couches par type
def __repr__(self):
layerRepr= "layer_list: " + repr(self.__layer_list)
nameRepr = "name_list: " + repr(self.__name_list)
ntypeRepr = "ntype: " + repr(self.__ntype)
return (layerRepr + "\n"
+ nameRepr + "\n"
+ ntypeRepr)
def __str__(self):
layerStr = "layer_list: " + str(self.__layer_list)
nameStr = "name_list: " + str(self.__name_list)
ntypeStr = "ntype: " + str(self.__ntype)
return (layerStr + "\n"
+ nameStr + "\n"
+ ntypeStr)
def __len__(self):
"""
Fonctions intégrées Python`len`Décrit l'opération lorsqu'elle est appelée depuis.
Renvoie la somme du nombre de couches par type.
"""
return int(np.sum(self.__ntype))
def __getitem__(self, key):
"""
Par exemple
lm = LayerManager()
+----------------+
| (Ajouter un élément à lm) |
+----------------+
x = lm[3].~~
Est appelé lors de l'accès à un élément d'une liste ou d'un tableau, comme
Décrivez l'opération à ce moment-là.
tranche et str,Autoriser uniquement l'accès int.
"""
if isinstance(key, slice):
#Si la clé est une tranche, reportez-vous à la liste des calques avec tranche.
#Valeur inhabituelle(Index hors de portée, etc.)Quand est entré
#Python me donne une erreur.
return self.__layer_list[key]
elif isinstance(key, str):
#Si key est une chaîne, récupérez l'index dans la liste des noms de chaque couche et
#Renvoie les éléments de la liste des couches applicables.
if key in self.__name_list:
index = self.__name_list.index(key)
return self.__layer_list[index]
else:
#Si la clé n'existe pas, une KeyError est émise.
raise KeyError("{}: No such item".format(key))
elif isinstance(key, int):
#Si key est un entier, renvoie l'élément correspondant dans la liste des couches.
#Valeur inhabituelle(Index hors de portée, etc.)Quand est entré
#Python me donne une erreur.
return self.__layer_list[key]
else:
raise KeyError(key, ": Undefined such key type.")
def __setitem__(self, key, value):
"""
Par exemple
lm = LayerManager()
+----------------+
| (Ajouter un élément à lm) |
+----------------+
lm[1] = x
Est appelé lors de l'accès à un élément d'une liste ou d'un tableau, comme
Décrivez l'opération à ce moment-là.
Seul l'écrasement des éléments est autorisé et l'ajout de nouveaux éléments est interdit.
"""
value_type = ""
if isinstance(value, list):
#Spécifié sur le côté droit'value'Mais'list'Si
#Tous les éléments'BaseLayer'Erreur si classe ou ne l'hérite pas.
if not np.all(
np.where(isinstance(value, BaseLayer), True, False)):
self.AssignError()
value_type = "list"
elif isinstance(value, BaseLayer):
#Spécifié sur le côté droit'value'Mais'BaseLayer'Est-ce une classe?
#Erreur s'il n'est pas hérité.
self.AssignError(type(value))
if value_type == "":
value_type = self.reg_keys[self.BASE]
if isinstance(key, slice):
#Si la clé est une tranche, écrasez l'élément dans la liste des calques.
#pourtant'value_type'Mais'list'Sinon, une erreur.
#Valeur inhabituelle(Index hors de portée, etc.)Quand est entré
#Python me donne une erreur.
if value_type != "list":
self.AssignError(value_type)
self.__layer_list[key] = value
elif isinstance(key, str):
#Si key est une chaîne, récupérez l'index dans la liste des noms de chaque couche et
#Remplacez les éléments dans la liste des calques applicables.
#pourtant'value_type'Mais'BaseLayer'Sinon, une erreur.
if value_type != self.reg_keys[self.BASE]:
raise AssignError(value_type)
if key in self.__name_list:
index = self.__name_list.index(key)
self.__layer_list[index] = value
else:
#Si la clé n'existe pas, une KeyError est émise.
raise KeyError("{}: No such item".format(key))
elif isinstance(key, int):
#Si la clé est un entier, écrasez l'élément correspondant dans la liste des couches.
#pourtant'value_type'Mais'BaseLayer'Sinon, une erreur.
#Aussi, une valeur anormale(Index hors de portée, etc.)Quand est entré
#Python me donne une erreur.
if value_type != self.reg_keys[self.BASE]:
raise AssignError(value_type)
self.__layer_list[key] = value
else:
raise KeyError(key, ": Undefined such key type.")
def __delitem__(self, key):
"""
Par exemple
lm = LayerManager()
+----------------+
| (Ajouter un élément à lm) |
+----------------+
del lm[2]
Parce qu'il est appelé lorsque l'élément de la liste ou du tableau est accédé par l'instruction del comme
Décrivez l'opération à ce moment-là.
Si l'élément spécifié existe, il sera supprimé et renommé.
"""
if isinstance(key, slice):
#Si la clé est une tranche, supprimez l'élément spécifié tel quel
#Valeur inhabituelle(Index hors de portée, etc.)Quand est entré
#Python me donne une erreur.
del self.__layer_list[slice]
del self.__name_list[slice]
elif isinstance(key, str):
#Si key est une chaîne, récupérez l'index dans la liste des noms de chaque couche et
#Supprimez l'élément concerné.
if key in self.__name_list:
del self.__layer_list[index]
del self.__name_list[index]
else:
#Si la clé n'existe pas, une KeyError est émise.
raise KeyError("{}: No such item".format(key))
elif isinstance(key, int):
#Si la clé est un entier, supprimez l'élément correspondant dans la liste des couches.
#Valeur inhabituelle(Index hors de portée, etc.)Quand est entré
#Python me donne une erreur.
del self.__layer_list[key]
else:
raise KeyError(key, ": Undefined such key type.")
#Renommer
self._rename()
def _rename(self):
"""
Lorsque la dénomination de la liste de noms enfreint les règles en raison de l'opération de liste
Renommez la liste de dénomination et chaque couche pour respecter à nouveau les règles.
La règle de dénomination est[Type de calque][Quel nombre]ça ira.
Si le type de calque est Couche intermédiaire, Milieu
Sortie pour la couche de sortie
Il est abrégé en.
Le nombre est compté par type.
Aussi, ici encore__Compte ntypes.
"""
#Initialiser le nombre de couches par type
self.__ntype = np.zeros(self.N_TYPE)
#Recompter et renommer chaque couche
for i in range(len(self)):
for j, reg_name in enumerate(self.REGULATED_DIC):
if reg_name in self.__name_list[i]:
self.__ntype[j] += 1
self.__name_list[i] = (self.reg_keys[j]
+ str(self.__ntype[j]))
self.__layer_list[i].name = (self.reg_keys[j]
+ str(self.__ntype[j]))
break
else:
raise UndefinedLayerType(self.__name_list[i])
def append(self, *, name="Middle", **kwds):
"""
Implémentation de la méthode append familière, qui est une méthode pour ajouter des éléments à une liste.
"""
if "prev" in kwds:
# 'prev'Est inclus dans le mot-clé
#Cela signifie que le nombre d'éléments de la couche précédente est spécifié.
#Fondamentalement, il est censé être le moment d'insérer la première couche, donc
#En dehors de cela, il est essentiellement déterminé automatiquement et n'est pas spécifié.
if len(self) != 0:
if kwds["prev"] != self.__layer_list[-1].n:
#Erreur s'il ne correspond pas au nombre d'unités à la fin.
raise UnmatchUnitError(self.__layer_list[-1].n,
kwds["prev"])
elif not self.is_CNN(name):
if len(self) == 0:
#La première couche DNN doit toujours spécifier le nombre d'unités d'entrée.
raise UnmatchUnitError("Input units", "Unspecified")
else:
#Le nombre d'unités dans la dernière couche'kwds'Ajouter à
kwds["prev"] = self.__layer_list[-1].n
#Lisez le type de couche et modifiez le nom selon la règle de dénomination
name = self.name_rule(name)
#Ajoutez un calque.
for i, reg_name in enumerate(self.REGULATED_DIC):
if name in reg_name:
#Incrémenter la couche par type
self.__ntype[i] += 1
#Ajouter au nom
name += str(self.__ntype[i])
#Ajouter à la liste de noms
self.__name_list.append(name)
#Enfin, créez un calque et ajoutez-le à la liste.
self.__layer_list.append(self.REGULATED_DIC[reg_name](name=name,**kwds))
def extend(self, lm):
"""
Un autre gestionnaire de couches qui existe déjà dans la méthode d'extension'lm'Des éléments de
Tout ajouter.
"""
if not isinstance(lm, LayerManager):
# 'lm'Erreur si l'instance de n'est pas LayerManager.
raise TypeError(type(lm), ": Unexpected type.")
if len(self) != 0:
if self.__layer_list[-1].n != lm[0].prev:
#Avec le nombre d'unités dans votre dernière couche
# 'lm'Erreur si le nombre d'entrées dans la première couche de n'est pas le même.
raise UnmatchUnitError(self.__layer_list[-1].n,
lm[0].prev)
#Chaque'extend'Ajouter par méthode
self.__layer_list.extend(lm.layer_list)
self.__name_list.extend(lm.name_list)
#Renommer
self._rename()
def insert(self, prev_name, name="Middle", **kwds):
"""
Dans la méthode d'insertion, spécifiez le nom du calque précédent et combinez-le avec ce calque.
Ajoutez un élément.
"""
# 'prev_name'Erreur si n'existe pas.
if not prev_name in self.__name_list:
raise KeyError(prev_name, ": No such key.")
# 'prev'Est inclus dans le mot-clé
# 'prev_name'Erreur s'il ne correspond pas au nombre d'unités du calque spécifié dans.
if "prev" in kwds:
if kwds["prev"] \
!= self.__layer_list[self.index(prev_name)].n:
raise UnmatchUnitError(
kwds["prev"],
self.__layer_list[self.index(prev_name)].n)
# 'n'Est inclus dans le mot-clé
if "n" in kwds:
# 'prev_name'Si ce n'est pas le dernier
if prev_name != self.__name_list[-1]:
#Erreur s'il ne correspond pas au nombre d'unités dans la couche suivante.
if kwds["n"] != self.__layer_list[
self.index(prev_name)+1].prev:
raise UnmatchUnitError(
kwds["n"],
self.__layer_list[self.index(prev_name)].prev)
#S'il n'y a pas encore d'éléments'append'Donnez une erreur pour utiliser la méthode.
if len(self) == 0:
raise RuntimeError(
"You have to use 'append' method instead.")
#Obtenir l'index de l'emplacement d'insertion
index = self.index(prev_name) + 1
#Lisez le type de couche et modifiez le nom selon la règle de dénomination
name = self.name_rule(name)
#Insérer un élément
for i, reg_name in enumerate(self.REGULATED_DIC):
if reg_name in name:
self.__layer_list.insert(index,
self.REGULATED_DIC[reg_name](name=name,**kwds))
self.__name_list.insert(index,
self.REGULATED_DIC[reg_name](name=name,**kwds))
#Renommer
self._rename()
def extend_insert(self, prev_name, lm):
"""
C'est la fonction d'origine.
Il se comporte comme une combinaison de la méthode extend et de la méthode insert.
En termes simples, c'est comme insérer un autre gestionnaire de calques.
"""
if not isinstance(lm, LayerManager):
# 'lm'Erreur si l'instance de n'est pas LayerManager.
raise TypeError(type(lm), ": Unexpected type.")
# 'prev_name'Erreur si n'existe pas.
if not prev_name in self.__name_list:
raise KeyError(prev_name, ": No such key.")
#Le nombre d'unités des couches avant et après l'emplacement spécifié et les première et dernière couches de lm
#S'ils ne correspondent pas, une erreur se produit.
if len(self) != 0:
if self.__layer_list[self.index(prev_name)].n \
!= lm.layer_list[0].prev:
#Avec le nombre d'unités dans votre emplacement désigné'lm'Le premier nombre d'unités dans
#S'ils ne correspondent pas, une erreur se produit.
raise UnmatchUnitError(
self.__layer_list[self.index(prev_name)].n,
lm.layer_list[0].prev)
if prev_name != self.__name_list[-1]:
# 'prev_name'N'est-ce pas ma dernière couche
if lm.layer_list[-1].n \
!= self.__layer_list[self.index(prev_name)+1].prev:
# 'lm'Le nombre d'unités à la fin et au niveau suivant de votre emplacement désigné
# 'prev'Erreur s'il ne correspond pas au nombre d'unités.
raise UnmatchUnitError(
lm.layer_list[-1].n,
self.__layer_list[self.index(prev_name)+1].prev)
else:
#Si vous n'avez aucun élément'extend'J'obtiens une erreur lors de l'utilisation de la méthode.
raise RuntimeError(
"You have to use 'extend' method instead.")
#Obtenir l'index de l'emplacement d'insertion
index = self.index(prev_name) + 1
#Éléments après l'emplacement d'insertion'buf'Après avoir évacué vers, retirez-le une fois
#Ajouter un élément à l'aide de la méthode extend
layer_buf = self.__layer_list[index:]
name_buf = self.__name_list[index:]
del self.__layer_list[index:]
del self.__name_list[index:]
self.extend(lm)
#Ajouter l'élément qui a été évacué
self.__layer_list.extend(layer_buf)
self.__name_list.extend(name_buf)
#Renommer
self._rename()
def remove(self, key):
"""
La méthode remove supprime l'élément avec le nom spécifié.
Il est également autorisé à être spécifié par index.
"""
#Déjà implémenté'del'La phrase est OK.
del self[key]
def index(self, target):
return self.__name_list.index(target)
def name(self, indices):
return self.__name_list[indices]
@property
def layer_list(self):
return self.__layer_list
@property
def name_list(self):
return self.__name_list
@property
def ntype(self):
return self.__ntype
def is_CNN(self, name=None):
if name is None:
if self.__ntype[self.CONV] > 0 \
or self.__ntype[self.POOL] > 0:
return True
else:
return False
else:
name = self.name_rule(name)
if self.reg_keys[self.CONV] in name \
or self.reg_keys[self.POOL] in name:
return True
else:
return False
Parmi les petits changements, j'omettrai les parties qui n'ont pas d'importance.
La partie à omettre est le changement dû à l'amélioration de la classe _TypeManager
. Le principal changement est l'utilisation de la propriété reg_keys
.
Le grand changement est qu'il est trop coûteux d'augmenter la branche conditionnelle à chaque fois que le type de couche est augmenté, j'ai donc rendu possible de le faire en boucle. A titre d'exemple, regardons la partie pertinente de la méthode ʻappend`.
layer_manager.py
#Ajoutez un calque.
for i, reg_name in enumerate(self.REGULATED_DIC):
if name in reg_name:
#Incrémenter la couche par type
self.__ntype[i] += 1
#Ajouter au nom
name += str(self.__ntype[i])
#Ajouter à la liste de noms
self.__name_list.append(name)
#Enfin, créez un calque et ajoutez-le à la liste.
self.__layer_list.append(self.REGULATED_DIC[reg_name](name=name,**kwds))
Le REGULATED_DIC
est mis en boucle avec la fonction ʻenumerate, et quand le nom de la couche est inclus dans le
reg_name, le numéro de couche ʻi
est utilisé pour le traitement.
** Par conséquent, les constantes de couche de la classe _TypeManager
et l'index d'enregistrement de REGULATED_DIC
doivent être alignés. ** **
Les autres parties sont similaires.
Enfin, nous avons préparé la fonction ʻis_CNN. Ceci renvoie si le réseau de la classe
LayerManger est un CNN, sauf si spécifié dans l'argument
nom. Si un nom de couche est spécifié pour
nom`, il renvoie si le nom de couche mérite un CNN (c'est-à-dire s'il s'agit d'une couche de convolution ou d'une couche de pooling).
Il est utilisé pour la propagation avant et arrière de la classe «Trainer».
Maintenant, passons à l'expérience CNN. Le code complet peut être trouvé ici [https://github.com/kuroitu/DNN_test). N'hésitez pas à cloner / copier et expérimenter.
Commençons par le jeu de données Keras. L'ensemble de données MNIST de Keras contient 60000 données d'entraînement et 10000 données de test, et la taille de l'image est de (28, 28) $, donc même s'il s'agit d'un petit ensemble de données d'apprentissage automatique, vous pouvez l'apprendre sur un ordinateur portable, etc. Est un ensemble de données assez volumineux.
keras_data.py
import numpy as np
from keras.datasets import mnist
#from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tqdm
#Acquisition de jeux de données
n_class=10
(x_train, y_train), (x_test, y_test) = mnist.load_data()
C, B, I_h, I_w = 1, *x_train.shape
B_test = x_test.shape[0]
#Standardisation
sc = StandardScaler()
x_train = sc.fit_transform(x_train.reshape(B, -1)).reshape(B, C, I_h, I_w)
x_test = sc.fit_transform(x_test.reshape(B_test, -1)).reshape(B_test, C, I_h, I_w)
# one-Conversion en étiquette à chaud
def to_one_hot(data, n_class):
vec = np.zeros((len(data), n_class))
for i in range(len(data)):
vec[i, data[i]] = 1.
return vec
t_train = to_one_hot(y_train, n_class)
t_test = to_one_hot(y_test, n_class)
Cette fois, nous ne créerons pas de données de «validation». Si vous voulez le créer, utilisez la fonction train_test_split
de scikit-learn pour diviser les données d'entraînement.
Plus tard, il est standardisé en utilisant la classe StandardScaler
de scicit-learn. Il ne fait aucun traitement difficile, vous pouvez donc écrire le code vous-même. De plus, puisqu'il s'agit de reconnaissance d'image, la normalisation est OK.
Veuillez noter que la classe StandardScaler
de scikit-learn ne prend en charge que les données dont l'entrée est $ (B, N) $.
Enfin, puisque l'étiquette de réponse correcte est constituée des données numériques d'un tableau unidimensionnel de $ (60000,) $ et $ (10000,) $, changez-le en ce que l'on appelle une expression one-hot.
L'expression one-hot correspond, par exemple, dans la classification à 10 classes, à des données de réponse correctes avec une étiquette numérique de $ 3 $, comme $ [0, 0, 0, 1, 0, 0, 0, 0, 0, 0] $. Une représentation de données qui ne prend $ 1 $ que pour la pièce.
Les étiquettes correctes seront donc $ (60000, 10) $ et $ (10000, 10) $.
Ceci termine le traitement des données.
Ensuite, je présenterai le cas de l'ensemble de données MNIST de scicit-learn. Comme je l'ai mentionné au début, il s'agit d'un ensemble de données assez petit, vous pouvez donc vous sentir libre d'essayer l'apprentissage automatique.
scikit_learn_data.py
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tqdm
#Acquisition de jeux de données
n_class=10
C, I_h, I_w = 1, 8, 8
digits = datasets.load_digits()
x = digits.data
t = digits.target
n_data = len(x)
#Standardisation
sc = StandardScaler()
x = sc.fit_transform(x).reshape(n_data, I_h, I_w)
x_train, x_test, y_train, y_test = train_test_split(x, t, test_size=0.2, shuffle=True)
# one-Conversion en étiquette à chaud
def to_one_hot(data, n_class):
vec = np.zeros((len(data), n_class))
for i in range(len(data)):
vec[i, data[i]] = 1.
return vec
t_train = to_one_hot(y_train, n_class)
t_test = to_one_hot(y_test, n_class)
Ce que nous faisons est presque le même qu'à Keras. La différence est que l'ensemble de données est passé au format $ (1797, 64) $. Par conséquent, après avoir normalisé les données, elles sont «remodelées» et divisées par la fonction «train_test_split».
Une fois que l'ensemble de données est prêt, il est temps d'apprendre.
cnn_test.py
#Créer une couche de convolution et une couche de sortie
M, F_h, F_w = 10, 3, 3
lm = LayerManager((x_train, x_test), (t_train, t_test))
lm.append(name="c", I_shape=(C, I_h, I_w), F_shape=(M, F_h, F_w), pad=1,
wb_width=0.1, opt="AdaDelta", opt_dic={"eta": 1e-2})
lm.append(name="p", I_shape=lm[-1].O_shape, pool=2)
lm.append(name="m", n=100, wb_width=0.1,
opt="AdaDelta", opt_dic={"eta": 1e-2})
lm.append(name="o", n=n_class, act="softmax", err_func="Cross", wb_width=0.1,
opt="AdaDelta", opt_dic={"eta": 1e-2})
#Apprendre
epoch = 50
threshold = 1e-8
n_batch = 8
lm.training(epoch, threshold=threshold, n_batch=n_batch, show_train_error=True)
#Prédire
print("training dataset")
lm.predict(x=lm.x_train, y=lm.y_train)
print("test dataset")
lm.predict()
Cette fois, nous construisons un CNN très simple. Le nombre d'époques d'apprentissage est de 50 et la taille du mini-lot est de 8. Le reste est laissé au gestionnaire de calques lol La structure de CNN est comme indiqué dans la figure ci-dessus. Le résultat de l'exécution avec scikit-learn doit être comme indiqué dans la figure ci-dessous.
En passant, visualisons le type de données que vous avez commis une erreur.
cnn_test.py
#Afficher des données incorrectes
col=4
dpi=125
y = lm.pred_func(lm.y_test)
fail_index = np.where(y_pred != y)[0]
print("incorrect index:", fail_index)
if fail_index.size:
row = int(np.ceil(fail_index.size/col))
if row * dpi >= 2 ** 16:
row = int(np.ceil((2 ** 16 // dpi - 1)/col))
fig, ax = plt.subplots(row, col, figsize=(col, row + 1), dpi=dpi, facecolor="w")
if row != 1:
for i, f in enumerate(fail_index):
ax[i // col, i % col].imshow(lm.x_test[f], interpolation='nearest', cmap='gray')
ax[i // col, i % col].tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
ax[i // col, i % col].set_title(str(y[f]) + " => " + str(y_pred[f]))
if i >= row * col:
break
else:
for i, f in enumerate(fail_index):
ax[i % col].imshow(lm.x_test[f], interpolation='nearest', cmap='gray')
ax[i % col].tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
ax[i % col].set_title(str(y[f]) + ' => ' + str(y_pred[f]))
if i >= row * col:
break
fig.tight_layout()
Quand ceci est exécuté, cela ressemblera à la figure ci-dessous. En passant, veuillez noter qu'il est différent du résultat de l'expérience précédente. Cela semble à peine visible pour les humains ... Cela peut être mal évalué (tel quel).
Pendant l'expérience, si la taille du lot était supérieure à 1, l'apprentissage ne s'est pas bien déroulé et j'ai eu du mal. Après tout, c'était parce que la fonction d'activation n'était pas compatible avec les lots. Les fonctions d'activation ordinaires peuvent être activées par lots grâce à «numpy», mais certaines fonctions exceptionnelles, telles que la fonction «softmax», doivent être activées par lots. fait. Si quelqu'un souffre de la même manière, soyez prudent.
Recommended Posts