from __future__ import division import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Variable import numpy as np import cv2 import matplotlib.pyplot as plt from util import count_parameters as count from util import convert2cpu as cpu from util import predict_transform class test_net(nn.Module): def __init__(self, num_layers, input_size): super(test_net, self).__init__() self.num_layers= num_layers self.linear_1 = nn.Linear(input_size, 5) self.middle = nn.ModuleList([nn.Linear(5,5) for x in range(num_layers)]) self.output = nn.Linear(5,2) def forward(self, x): x = x.view(-1) fwd = nn.Sequential(self.linear_1, *self.middle, self.output) return fwd(x) def get_test_input(): img = cv2.imread("dog-cycle-car.png") img = cv2.resize(img, (416,416)) img_ = img[:,:,::-1].transpose((2,0,1)) img_ = img_[np.newaxis,:,:,:]/255.0 img_ = torch.from_numpy(img_).float() img_ = Variable(img_) return img_ def parse_cfg(cfgfile): """ Takes a configuration file Returns a list of blocks. Each blocks describes a block in the neural network to be built. Block is represented as a dictionary in the list """ file = open(cfgfile, 'r') lines = file.read().split('\n') #store the lines in a list lines = [x for x in lines if len(x) > 0] #get read of the empty lines lines = [x for x in lines if x[0] != '#'] lines = [x.rstrip().lstrip() for x in lines] block = {} blocks = [] for line in lines: if line[0] == "[": #This marks the start of a new block if len(block) != 0: blocks.append(block) block = {} block["type"] = line[1:-1].rstrip() else: key,value = line.split("=") block[key.rstrip()] = value.lstrip() blocks.append(block) return blocks # print('\n\n'.join([repr(x) for x in blocks])) import pickle as pkl class MaxPoolStride1(nn.Module): def __init__(self, kernel_size): super(MaxPoolStride1, self).__init__() self.kernel_size = kernel_size self.pad = kernel_size - 1 def forward(self, x): padded_x = F.pad(x, (0,self.pad,0,self.pad), mode="replicate") pooled_x = nn.MaxPool2d(self.kernel_size, self.pad)(padded_x) return pooled_x class EmptyLayer(nn.Module): def __init__(self): super(EmptyLayer, self).__init__() class DetectionLayer(nn.Module): def __init__(self, anchors): super(DetectionLayer, self).__init__() self.anchors = anchors def forward(self, x, inp_dim, num_classes, confidence): x = x.data global CUDA prediction = x prediction = predict_transform(prediction, inp_dim, self.anchors, num_classes, confidence, CUDA) return prediction class Upsample(nn.Module): def __init__(self, stride=2): super(Upsample, self).__init__() self.stride = stride def forward(self, x): stride = self.stride assert(x.data.dim() == 4) B = x.data.size(0) C = x.data.size(1) H = x.data.size(2) W = x.data.size(3) ws = stride hs = stride x = x.view(B, C, H, 1, W, 1).expand(B, C, H, stride, W, stride).contiguous().view(B, C, H*stride, W*stride) return x # class ReOrgLayer(nn.Module): def __init__(self, stride = 2): super(ReOrgLayer, self).__init__() self.stride= stride def forward(self,x): assert(x.data.dim() == 4) B,C,H,W = x.data.shape hs = self.stride ws = self.stride assert(H % hs == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(H) assert(W % ws == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(W) x = x.view(B,C, H // hs, hs, W // ws, ws).transpose(-2,-3).contiguous() x = x.view(B,C, H // hs * W // ws, hs, ws) x = x.view(B,C, H // hs * W // ws, hs*ws).transpose(-1,-2).contiguous() x = x.view(B, C, ws*hs, H // ws, W // ws).transpose(1,2).contiguous() x = x.view(B, C*ws*hs, H // ws, W // ws) return x def create_modules(blocks): net_info = blocks[0] #Captures the information about the input and pre-processing module_list = nn.ModuleList() index = 0 #indexing blocks helps with implementing route layers (skip connections) prev_filters = 3 output_filters = [] for x in blocks: module = nn.Sequential() if (x["type"] == "net"): continue #If it's a convolutional layer if (x["type"] == "convolutional"): #Get the info about the layer activation = x["activation"] try: batch_normalize = int(x["batch_normalize"]) bias = False except: batch_normalize = 0 bias = True filters= int(x["filters"]) padding = int(x["pad"]) kernel_size = int(x["size"]) stride = int(x["stride"]) if padding: pad = (kernel_size - 1) // 2 else: pad = 0 #Add the convolutional layer conv = nn.Conv2d(prev_filters, filters, kernel_size, stride, pad, bias = bias) module.add_module("conv_{0}".format(index), conv) #Add the Batch Norm Layer if batch_normalize: bn = nn.BatchNorm2d(filters) module.add_module("batch_norm_{0}".format(index), bn) #Check the activation. #It is either Linear or a Leaky ReLU for YOLO if activation == "leaky": activn = nn.LeakyReLU(0.1, inplace = True) module.add_module("leaky_{0}".format(index), activn) #If it's an upsampling layer #We use Bilinear2dUpsampling elif (x["type"] == "upsample"): stride = int(x["stride"]) # upsample = Upsample(stride) upsample = nn.Upsample(scale_factor = 2, mode = "nearest") module.add_module("upsample_{}".format(index), upsample) #If it is a route layer elif (x["type"] == "route"): x["layers"] = x["layers"].split(',') #Start of a route start = int(x["layers"][0]) #end, if there exists one. try: end = int(x["layers"][1]) except: end = 0 #Positive anotation if start > 0: start = start - index if end > 0: end = end - index route = EmptyLayer() module.add_module("route_{0}".format(index), route) if end < 0: filters = output_filters[index + start] + output_filters[index + end] else: filters= output_filters[index + start] #shortcut corresponds to skip connection elif x["type"] == "shortcut": from_ = int(x["from"]) shortcut = EmptyLayer() module.add_module("shortcut_{}".format(index), shortcut) elif x["type"] == "maxpool": stride = int(x["stride"]) size = int(x["size"]) if stride != 1: maxpool = nn.MaxPool2d(size, stride) else: maxpool = MaxPoolStride1(size) module.add_module("maxpool_{}".format(index), maxpool) #Yolo is the detection layer elif x["type"] == "yolo": mask = x["mask"].split(",") mask = [int(x) for x in mask] anchors = x["anchors"].split(",") anchors = [int(a) for a in anchors] anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors),2)] anchors = [anchors[i] for i in mask] detection = DetectionLayer(anchors) module.add_module("Detection_{}".format(index), detection) else: print("Something I dunno") assert False module_list.append(module) prev_filters = filters output_filters.append(filters) index += 1 return (net_info, module_list) class Darknet(nn.Module): def __init__(self, cfgfile): super(Darknet, self).__init__() self.blocks = parse_cfg(cfgfile) self.net_info, self.module_list = create_modules(self.blocks) self.header = torch.IntTensor([0,0,0,0]) self.seen = 0 def get_blocks(self): return self.blocks def get_module_list(self): return self.module_list def forward(self, x, CUDA): detections = [] modules = self.blocks[1:] outputs = {} #We cache the outputs for the route layer write = 0 for i in range(len(modules)): module_type = (modules[i]["type"]) if module_type == "convolutional" or module_type == "upsample" or module_type == "maxpool": x = self.module_list[i](x) outputs[i] = x elif module_type == "route": layers = modules[i]["layers"] layers = [int(a) for a in layers] if (layers[0]) > 0: layers[0] = layers[0] - i if len(layers) == 1: x = outputs[i + (layers[0])] else: if (layers[1]) > 0: layers[1] = layers[1] - i map1 = outputs[i + layers[0]] map2 = outputs[i + layers[1]] x = torch.cat((map1, map2), 1) outputs[i] = x elif module_type == "shortcut": from_ = int(modules[i]["from"]) x = outputs[i-1] + outputs[i+from_] outputs[i] = x elif module_type == 'yolo': anchors = self.module_list[i][0].anchors #Get the input dimensions inp_dim = int (self.net_info["height"]) #Get the number of classes num_classes = int (modules[i]["classes"]) #Output the result x = x.data x = predict_transform(x, inp_dim, anchors, num_classes, CUDA) if type(x) == int: continue if not write: detections = x write = 1 else: detections = torch.cat((detections, x), 1) outputs[i] = outputs[i-1] try: return detections except: return 0 def load_weights(self, weightfile): #Open the weights file fp = open(weightfile, "rb") #The first 4 values are header information # 1. Major version number # 2. Minor Version Number # 3. Subversion number # 4. IMages seen header = np.fromfile(fp, dtype = np.int32, count = 5) self.header = torch.from_numpy(header) self.seen = self.header[3] #The rest of the values are the weights # Let's load them up weights = np.fromfile(fp, dtype = np.float32) ptr = 0 for i in range(len(self.module_list)): module_type = self.blocks[i + 1]["type"] if module_type == "convolutional": model = self.module_list[i] try: batch_normalize = int(self.blocks[i+1]["batch_normalize"]) except: batch_normalize = 0 conv = model[0] if (batch_normalize): bn = model[1] #Get the number of weights of Batch Norm Layer num_bn_biases = bn.bias.numel() #Load the weights bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases]) ptr += num_bn_biases bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) ptr += num_bn_biases bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) ptr += num_bn_biases bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) ptr += num_bn_biases #Cast the loaded weights into dims of model weights. bn_biases = bn_biases.view_as(bn.bias.data) bn_weights = bn_weights.view_as(bn.weight.data) bn_running_mean = bn_running_mean.view_as(bn.running_mean) bn_running_var = bn_running_var.view_as(bn.running_var) #Copy the data to model bn.bias.data.copy_(bn_biases) bn.weight.data.copy_(bn_weights) bn.running_mean.copy_(bn_running_mean) bn.running_var.copy_(bn_running_var) else: #Number of biases num_biases = conv.bias.numel() #Load the weights conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases]) ptr = ptr + num_biases #reshape the loaded weights according to the dims of the model weights conv_biases = conv_biases.view_as(conv.bias.data) #Finally copy the data conv.bias.data.copy_(conv_biases) #Let us load the weights for the Convolutional layers num_weights = conv.weight.numel() #Do the same as above for weights conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights]) ptr = ptr + num_weights conv_weights = conv_weights.view_as(conv.weight.data) conv.weight.data.copy_(conv_weights) def save_weights(self, savedfile, cutoff = 0): if cutoff <= 0: cutoff = len(self.blocks) - 1 fp = open(savedfile, 'wb') # Attach the header at the top of the file self.header[3] = self.seen header = self.header header = header.numpy() header.tofile(fp) # Now, let us save the weights for i in range(len(self.module_list)): module_type = self.blocks[i+1]["type"] if (module_type) == "convolutional": model = self.module_list[i] try: batch_normalize = int(self.blocks[i+1]["batch_normalize"]) except: batch_normalize = 0 conv = model[0] if (batch_normalize): bn = model[1] #If the parameters are on GPU, convert them back to CPU #We don't convert the parameter to GPU #Instead. we copy the parameter and then convert it to CPU #This is done as weight are need to be saved during training cpu(bn.bias.data).numpy().tofile(fp) cpu(bn.weight.data).numpy().tofile(fp) cpu(bn.running_mean).numpy().tofile(fp) cpu(bn.running_var).numpy().tofile(fp) else: cpu(conv.bias.data).numpy().tofile(fp) #Let us save the weights for the Convolutional layers cpu(conv.weight.data).numpy().tofile(fp) # #dn = Darknet('cfg/yolov3.cfg') #dn.load_weights("yolov3.weights") #inp = get_test_input() #a, interms = dn(inp) #dn.eval() #a_i, interms_i = dn(inp)