import numpy as np
import munkres
import pt_config
# Configuration constants. Change these in pt_config
BLOB_LIFE = pt_config.BLOB_LIFE # life of blob in frames, if not seen
EDGE_THRESHOLD = pt_config.EDGE_THRESHOLD # border of image, in pixels, which is regarded as out-of-frame
DISTANCE_THRESHOLD = pt_config.DISTANCE_THRESHOLD # distance threshold, in pixels. If blob is further than this from previous position, update is ignored
MOVE_LIMIT = pt_config.MOVE_LIMIT # maximum velocity of the blob. If outside this limit, velocity is disabled
MATCH_DISTANCE = pt_config.MATCH_DISTANCE # maximum distance between blobs in the Hungarian algorithm matching step
blob_id = 0
[docs]class VirtualBlob:
"""
Represents a single pedestrian blob.
"""
def __init__(self, x,y):
"""
Create a new blob at the given (x,y) co-ordinate (in pixels). Each blob has a unique
ID number, and a random color (for visulisation)
"""
global blob_id
self.x = x
self.y = y
self.dx = 0
self.dy = 0
self.life = BLOB_LIFE
self.got_updated = False
self.color = (np.random.randint(0,255),np.random.randint(0,255),np.random.randint(0,255))
self.id = blob_id
blob_id = blob_id + 1
[docs] def update_location(self, x, y):
"""Update the current state of the blob to the new given position, if it is
not too far away (<DISTANCE_THRESHOLD away) from the previous position"""
if abs(x-self.x)<DISTANCE_THRESHOLD and abs(y-self.y)<DISTANCE_THRESHOLD:
self.dx = 0.65*self.dx + 0.35*(x - self.x)
self.dy = 0.65*self.dy + 0.35*(y - self.y)
self.x = 0.6*self.x + 0.4*x
self.y = 0.6*self.y + 0.4*y
self.life = BLOB_LIFE
self.got_updated = True
[docs] def set_location(self, x, y):
"""Change the position of the blob _without_ any distance filtering or velocity calculation."""
self.x = x
self.y = y
[docs] def move(self):
"""Apply the current estimated velocity to the blob; used when the blob is not observed in the scene"""
if abs(self.dx) < MOVE_LIMIT and abs(self.dy) < MOVE_LIMIT:
self.x += self.dx
self.y += self.dy
[docs] def decay(self):
"""Age the blob by one unit. When life<=0, return True, else return False"""
# update location using velocity
# die a bit
self.life = self.life - 1
return self.life<=0
def __repr__(self):
return "(%d, %d, %d, %d)" % (self.x, self.y, self.dx, self.dy)
[docs]class BlobTracker:
"""The tracker object, which keeps track of a collection of pedestrian blobs"""
def __init__(self):
"""Initialise a new, empty tracker"""
self.virtual_blobs = []
self.traces = {}
self.frame = 0
self.is_inited=False
[docs] def init_blobs(self, blobs, fnum):
"""Initialise a set of blobs, from a list of initial (x,y) co-ordinates, in the format
[(x,y), (x,y), ... ] """
# initialise virtual blobs to be blobs
self.virtual_blobs = []
for blob in blobs:
v = VirtualBlob(blob[0], blob[1])
self.virtual_blobs.append(v)
self.traces[v.id] = [(v.x, v.y, fnum)]
self.is_inited = True
#returns true is this blob is within the frame
[docs] def check_frame(self, blob, frame):
"""Given an (x,y) co-ordinated, check if that position is inside the central frame (i.e. is
not inside the border region"""
# Check Frame
in_frame = False
# left
if blob[0]< frame[0]+EDGE_THRESHOLD:
in_frame = True
# right
if blob[0]> frame[2]-EDGE_THRESHOLD:
in_frame = True
# top
if blob[1]< frame[1]+EDGE_THRESHOLD:
in_frame = True
# bottom
if blob[1]> frame[3]-EDGE_THRESHOLD:
in_frame = True
return in_frame
[docs] def track_blobs(self, blobs, frame, fnum):
"""Main update call. Takes a list of new, observed blob co-ordinates, a rectangular frame specifier of the form
[left, bottom, right, top] and a frame number, and updates the positions of the virtual blobs."""
# initialise if not already done so
if not self.is_inited:
self.init_blobs(blobs, fnum)
return
# get max length of blob lists
max_size = max(len(blobs), len(self.virtual_blobs))
distance_matrix = np.zeros((max_size, max_size))
for v in self.virtual_blobs:
v.move()
# compute distance matrix
for i in range(max_size):
if i>=len(blobs):
distance_matrix[i,:] = 0
# no matching blob/virtual blob
else:
for j in range(max_size):
if j>=len(self.virtual_blobs):
distance_matrix[i,j] = 0
else:
dx = blobs[i][0]-self.virtual_blobs[j].x
dy = blobs[i][1]-self.virtual_blobs[j].y
distance_matrix[i,j] = np.sqrt(dx**2 + dy**2)
copy_distances = np.array(distance_matrix)
m = munkres.Munkres()
ot = m.compute(distance_matrix)
rows = [t[1] for t in ot]
# clear the update flag
for v in self.virtual_blobs:
v.got_updated = False
# blobs on rows
for i,matching_virtual in enumerate(rows):
if i<len(blobs):
blob = blobs[i]
if matching_virtual<len(self.virtual_blobs):
if copy_distances[i][matching_virtual]< MATCH_DISTANCE:
self.virtual_blobs[matching_virtual].update_location(blob[0], blob[1])
elif self.check_frame(blob, frame):
v = VirtualBlob(blob[0], blob[1])
self.virtual_blobs.append(v)
self.traces[v.id] = [(v.x, v.y, fnum)]
else:
# new baby blobs!
spawn = False
# left
if blob[0]<frame[0]+EDGE_THRESHOLD:
spawn = True
# right
if blob[0]>frame[2]-EDGE_THRESHOLD:
spawn = True
# top
if blob[1]<frame[1]+EDGE_THRESHOLD:
spawn = True
# bottom
if blob[1]>frame[3]-EDGE_THRESHOLD:
spawn = True
if spawn:
v = VirtualBlob(blob[0], blob[1])
self.virtual_blobs.append(v)
self.traces[v.id] = [(v.x, v.y, fnum)]
else:
pass
# deal with un-updated blobs
graveyard = []
for v in self.virtual_blobs:
if not v.got_updated:
# move, and reduce life counter
if v.decay():
#print "Virtual blob %s finally died." % v
graveyard.append(v)
# append trace of blob movement
self.traces[v.id].append((v.x, v.y, fnum))
# clean up the bodies
for v in graveyard:
self.virtual_blobs.remove(v)