Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 324 additions & 0 deletions tracker/.ipynb_checkpoints/gmc-checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
import cv2
import matplotlib.pyplot as plt
import numpy as np
import copy
import time


class GMC:
def __init__(self, method='sparseOptFlow', downscale=2, verbose=None):
super(GMC, self).__init__()

self.method = method
self.downscale = max(1, int(downscale))

if self.method == 'orb':
self.detector = cv2.FastFeatureDetector_create(20)
self.extractor = cv2.ORB_create()
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING)

elif self.method == 'sift':
self.detector = cv2.SIFT_create(nOctaveLayers=3, contrastThreshold=0.02, edgeThreshold=20)
self.extractor = cv2.SIFT_create(nOctaveLayers=3, contrastThreshold=0.02, edgeThreshold=20)
self.matcher = cv2.BFMatcher(cv2.NORM_L2)

elif self.method == 'ecc':
number_of_iterations = 5000
termination_eps = 1e-6
self.warp_mode = cv2.MOTION_EUCLIDEAN
self.criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, number_of_iterations, termination_eps)

elif self.method == 'sparseOptFlow':
self.feature_params = dict(maxCorners=1000, qualityLevel=0.01, minDistance=1, blockSize=3,
useHarrisDetector=False, k=0.04)
# self.gmc_file = open('GMC_results.txt', 'w')

elif self.method == 'file' or self.method == 'files':
seqName = verbose[0]
ablation = verbose[1]
if ablation:
filePath = r'tracker/GMC_files/MOT17_ablation'
else:
filePath = r'tracker/GMC_files/MOTChallenge'

if '-FRCNN' in seqName:
seqName = seqName[:-6]
elif '-DPM' in seqName:
seqName = seqName[:-4]
elif '-SDP' in seqName:
seqName = seqName[:-4]

self.gmcFile = open(filePath + "/GMC-" + seqName + ".txt", 'r')

if self.gmcFile is None:
raise ValueError("Error: Unable to open GMC file in directory:" + filePath)
elif self.method == 'none' or self.method == 'None':
self.method = 'none'
else:
raise ValueError("Error: Unknown CMC method:" + method)

self.prevFrame = None
self.prevKeyPoints = None
self.prevDescriptors = None

self.initializedFirstFrame = False

def apply(self, raw_frame, detections=None):
if self.method == 'orb' or self.method == 'sift':
return self.applyFeaures(raw_frame, detections)
elif self.method == 'ecc':
return self.applyEcc(raw_frame, detections)
elif self.method == 'sparseOptFlow':
return self.applySparseOptFlow(raw_frame, detections)
elif self.method == 'file':
return self.applyFile(raw_frame, detections)
elif self.method == 'none':
return np.eye(2, 3)
else:
return np.eye(2, 3)

def applyEcc(self, raw_frame, detections=None):

# Initialize
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3, dtype=np.float32)

# Downscale image (TODO: consider using pyramids)
if self.downscale > 1.0:
frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
width = width // self.downscale
height = height // self.downscale

# Handle first frame
if not self.initializedFirstFrame:
# Initialize data
self.prevFrame = frame.copy()

# Initialization done
self.initializedFirstFrame = True

return H

# Run the ECC algorithm. The results are stored in warp_matrix.
# (cc, H) = cv2.findTransformECC(self.prevFrame, frame, H, self.warp_mode, self.criteria)
try:
(cc, H) = cv2.findTransformECC(self.prevFrame, frame, H, self.warp_mode, self.criteria, None, 1)
except:
print('Warning: find transform failed. Set warp as identity')

return H

def applyFeaures(self, raw_frame, detections=None):

# Initialize
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3)

# Downscale image (TODO: consider using pyramids)
if self.downscale > 1.0:
# frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
width = width // self.downscale
height = height // self.downscale

# find the keypoints
mask = np.zeros_like(frame)
# mask[int(0.05 * height): int(0.95 * height), int(0.05 * width): int(0.95 * width)] = 255
mask[int(0.02 * height): int(0.98 * height), int(0.02 * width): int(0.98 * width)] = 255
if detections is not None:
for det in detections:
tlbr = (det[:4] / self.downscale).astype(np.int_)
mask[tlbr[1]:tlbr[3], tlbr[0]:tlbr[2]] = 0

keypoints = self.detector.detect(frame, mask)

# compute the descriptors
keypoints, descriptors = self.extractor.compute(frame, keypoints)

# Handle first frame
if not self.initializedFirstFrame:
# Initialize data
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)

# Initialization done
self.initializedFirstFrame = True

return H

# Match descriptors.
knnMatches = self.matcher.knnMatch(self.prevDescriptors, descriptors, 2)

# Filtered matches based on smallest spatial distance
matches = []
spatialDistances = []

maxSpatialDistance = 0.25 * np.array([width, height])

# Handle empty matches case
if len(knnMatches) == 0:
# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)

return H

for m, n in knnMatches:
if m.distance < 0.9 * n.distance:
prevKeyPointLocation = self.prevKeyPoints[m.queryIdx].pt
currKeyPointLocation = keypoints[m.trainIdx].pt

spatialDistance = (prevKeyPointLocation[0] - currKeyPointLocation[0],
prevKeyPointLocation[1] - currKeyPointLocation[1])

if (np.abs(spatialDistance[0]) < maxSpatialDistance[0]) and \
(np.abs(spatialDistance[1]) < maxSpatialDistance[1]):
spatialDistances.append(spatialDistance)
matches.append(m)

meanSpatialDistances = np.mean(spatialDistances, 0)
stdSpatialDistances = np.std(spatialDistances, 0)

inliesrs = (spatialDistances - meanSpatialDistances) < 2.5 * stdSpatialDistances

goodMatches = []
prevPoints = []
currPoints = []
for i in range(len(matches)):
if inliesrs[i, 0] and inliesrs[i, 1]:
goodMatches.append(matches[i])
prevPoints.append(self.prevKeyPoints[matches[i].queryIdx].pt)
currPoints.append(keypoints[matches[i].trainIdx].pt)

prevPoints = np.array(prevPoints)
currPoints = np.array(currPoints)

# Draw the keypoint matches on the output image
if 0:
matches_img = np.hstack((self.prevFrame, frame))
matches_img = cv2.cvtColor(matches_img, cv2.COLOR_GRAY2BGR)
W = np.size(self.prevFrame, 1)
for m in goodMatches:
prev_pt = np.array(self.prevKeyPoints[m.queryIdx].pt, dtype=np.int_)
curr_pt = np.array(keypoints[m.trainIdx].pt, dtype=np.int_)
curr_pt[0] += W
color = np.random.randint(0, 255, (3,))
color = (int(color[0]), int(color[1]), int(color[2]))

matches_img = cv2.line(matches_img, prev_pt, curr_pt, tuple(color), 1, cv2.LINE_AA)
matches_img = cv2.circle(matches_img, prev_pt, 2, tuple(color), -1)
matches_img = cv2.circle(matches_img, curr_pt, 2, tuple(color), -1)

plt.figure()
plt.imshow(matches_img)
plt.show()

# Find rigid matrix
if (np.size(prevPoints, 0) > 4) and (np.size(prevPoints, 0) == np.size(prevPoints, 0)):
H, inliesrs = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)

# Handle downscale
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
print('Warning: not enough matching points')

# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)

return H

def applySparseOptFlow(self, raw_frame, detections=None):

t0 = time.time()

# Initialize
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3)

# Downscale image
if self.downscale > 1.0:
# frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))

# find the keypoints
keypoints = cv2.goodFeaturesToTrack(frame, mask=None, **self.feature_params)

# Handle first frame
if not self.initializedFirstFrame:
# Initialize data
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)

# Initialization done
self.initializedFirstFrame = True

return H

# find correspondences with error handling
try:
matchedKeypoints, status, err = cv2.calcOpticalFlowPyrLK(self.prevFrame, frame, self.prevKeyPoints, None)
except:
print('Warning: find transform failed. Set warp as identity')
# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)

return H

# leave good correspondences only
prevPoints = []
currPoints = []

for i in range(len(status)):
if status[i]:
prevPoints.append(self.prevKeyPoints[i])
currPoints.append(matchedKeypoints[i])

prevPoints = np.array(prevPoints)
currPoints = np.array(currPoints)

# Find rigid matrix
if (np.size(prevPoints, 0) > 4) and (np.size(prevPoints, 0) == np.size(prevPoints, 0)):
H, inliesrs = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)

# Handle downscale
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
print('Warning: not enough matching points')

# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)

t1 = time.time()

# gmc_line = str(1000 * (t1 - t0)) + "\t" + str(H[0, 0]) + "\t" + str(H[0, 1]) + "\t" + str(
# H[0, 2]) + "\t" + str(H[1, 0]) + "\t" + str(H[1, 1]) + "\t" + str(H[1, 2]) + "\n"
# self.gmc_file.write(gmc_line)

return H

def applyFile(self, raw_frame, detections=None):
line = self.gmcFile.readline()
tokens = line.split("\t")
H = np.eye(2, 3, dtype=np.float_)
H[0, 0] = float(tokens[1])
H[0, 1] = float(tokens[2])
H[0, 2] = float(tokens[3])
H[1, 0] = float(tokens[4])
H[1, 1] = float(tokens[5])
H[1, 2] = float(tokens[6])

return H
12 changes: 10 additions & 2 deletions tracker/gmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,16 @@ def applySparseOptFlow(self, raw_frame, detections=None):

return H

# find correspondences
matchedKeypoints, status, err = cv2.calcOpticalFlowPyrLK(self.prevFrame, frame, self.prevKeyPoints, None)
# find correspondences with error handling
try:
matchedKeypoints, status, err = cv2.calcOpticalFlowPyrLK(self.prevFrame, frame, self.prevKeyPoints, None)
except:
print('Warning: find transform failed. Set warp as identity')
# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)

return H

# leave good correspondences only
prevPoints = []
Expand Down