import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers, Sequential class BasicBlock(layers.Layer): def __init__(self, filter_num, stride=1):
super(BasicBlock, self).__init__()
self.conv1 \= layers.Conv2D(filter\_num, (3, 3), strides=stride, padding=‘same‘)
self.bn1 \= layers.BatchNormalization()
self.relu \= layers.Activation(‘relu‘)
self.conv2 \= layers.Conv2D(filter\_num, (3, 3), strides=1, padding=‘same‘)
self.bn2 \= layers.BatchNormalization() if stride != 1:
self.downsample \= Sequential()
self.downsample.add(layers.Conv2D(filter\_num, (1, 1), strides=stride)) else:
self.downsample \= lambda x:x def call(self, inputs, training=None): # \[b, h, w, c\]
out = self.conv1(inputs)
out \= self.bn1(out,training=training)
out \= self.relu(out)
out \= self.conv2(out)
out \= self.bn2(out,training=training)
identity \= self.downsample(inputs)
output \= layers.add(\[out, identity\])
output \= tf.nn.relu(output) return output class ResNet(keras.Model): def \_\_init\_\_(self, layer\_dims, num\_classes=100): # \[2, 2, 2, 2\]
super(ResNet, self).\_\_init\_\_()
self.stem \= Sequential(\[layers.Conv2D(64, (3, 3), strides=(1, 1)),
layers.BatchNormalization(),
layers.Activation(‘relu‘),
layers.MaxPool2D(pool\_size\=(2, 2), strides=(1, 1), padding=‘same‘)
\])
self.layer1 \= self.build\_resblock(64, layer\_dims\[0\])
self.layer2 \= self.build\_resblock(128, layer\_dims\[1\], stride=2)
self.layer3 \= self.build\_resblock(256, layer\_dims\[2\], stride=2)
self.layer4 \= self.build\_resblock(512, layer\_dims\[3\], stride=2) # output: \[b, 512, h, w\],
self.avgpool = layers.GlobalAveragePooling2D()
self.fc \= layers.Dense(num\_classes) def call(self, inputs, training=None):
x \= self.stem(inputs,training=training)
x \= self.layer1(x,training=training)
x \= self.layer2(x,training=training)
x \= self.layer3(x,training=training)
x \= self.layer4(x,training=training) # \[b, c\]
x = self.avgpool(x) # \[b, 100\]
x = self.fc(x) return x def build\_resblock(self, filter\_num, blocks, stride=1):
res\_blocks \= Sequential() # may down sample
res_blocks.add(BasicBlock(filter_num, stride)) for _ in range(1, blocks):
res_blocks.add(BasicBlock(filter_num, stride=1)) return res_blocks def resnet18(): return ResNet([2, 2, 2, 2]) def resnet34(): return ResNet([3, 4, 6, 3])
import os
os.environ[‘TF_CPP_MIN_LOG_LEVEL‘]=‘2‘
import tensorflow as tf from tensorflow.keras import layers, optimizers, datasets, Sequential from resnet import resnet18
tf.random.set_seed(2345) def preprocess(x, y): # [-1~1]
x = tf.cast(x, dtype=tf.float32) / 255. - 0.5 y = tf.cast(y, dtype=tf.int32) return x,y
(x,y), (x_test, y_test) = datasets.cifar100.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1) print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(512)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(512)
sample = next(iter(train_db)) print(‘sample:‘, sample[0].shape, sample[1].shape,
tf.reduce_min(sample[0]), tf.reduce_max(sample[0])) def main(): # [b, 32, 32, 3] => [b, 1, 1, 512]
model = resnet18()
model.build(input_shape=(None, 32, 32, 3))
model.summary()
optimizer = optimizers.Adam(lr=1e-3) for epoch in range(500): for step, (x,y) in enumerate(train_db):
with tf.GradientTape() as tape: # \[b, 32, 32, 3\] => \[b, 100\]
logits = model(x,training=True) # \[b\] => \[b, 100\]
y\_onehot = tf.one\_hot(y, depth=100) # compute loss
loss = tf.losses.categorical\_crossentropy(y\_onehot, logits, from\_logits=True)
loss \= tf.reduce\_mean(loss)
grads \= tape.gradient(loss, model.trainable\_variables)
optimizer.apply\_gradients(zip(grads, model.trainable\_variables)) if step %50 == 0: print(epoch, step, ‘loss:‘, float(loss))
total\_num \= 0
total\_correct \= 0 for x,y in test\_db:
logits \= model(x,training=False)
prob \= tf.nn.softmax(logits, axis=1)
pred \= tf.argmax(prob, axis=1)
pred \= tf.cast(pred, dtype=tf.int32)
correct \= tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct \= tf.reduce\_sum(correct)
total\_num += x.shape\[0\]
total\_correct += int(correct)
acc \= total\_correct / total\_num print(epoch, ‘acc:‘, acc) if \_\_name\_\_ == ‘\_\_main\_\_‘:
main()
手机扫一扫
移动阅读更方便
你可能感兴趣的文章