Skip to content

Commit

Permalink
adding module and simple tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stuntgoat committed Oct 30, 2012
1 parent cfe1115 commit a059d5e
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 0 deletions.
143 changes: 143 additions & 0 deletions kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from collections import defaultdict
from random import uniform
from math import sqrt



def point_avg(points):
"""
Accepts a list of points, each with the same number of dimensions.
NB. points can have more dimensions than 2
Returns a new point which is the center of all the points.
"""
dimensions = len(points[0])

new_center = []

for dimension in xrange(dimensions):
dim_sum = 0 # dimension sum
for p in points:
dim_sum += p[dimension]

# average of each dimension
new_center.append(dim_sum / float(len(points)))

return new_center


def update_centers(data_set, assignments):
"""
Accepts a dataset and a list of assignments; the indexes
of both lists correspond to each other.
Compute the center for each of the assigned groups.
Return `k` centers where `k` is the number of unique assignments.
"""
new_means = defaultdict(list)
centers = []
for assignment, point in zip(assignments, data_set):
new_means[assignment].append(point)

for points in new_means.itervalues():
centers.append(point_avg(points))

return centers


def assign_points(data_points, centers):
"""
Given a data set and a list of points betweeen other points,
assign each point to an index that corresponds to the index
of the center point on it's proximity to that point.
Return a an array of indexes of centers that correspond to
an index in the data set; that is, if there are N points
in `data_set` the list we return will have N elements. Also
If there are Y points in `centers` there will be Y unique
possible values within the returned list.
"""
assignments = []
for point in data_points:
shortest = () # positive infinity
shortest_index = 0
for i in xrange(len(centers)):
val = distance(point, centers[i])
if val < shortest:
shortest = val
shortest_index = i
assignments.append(shortest_index)
return assignments


def distance(a, b):
"""
"""
dimensions = len(a)

_sum = 0
for dimension in xrange(dimensions):
difference_sq = (a[dimension] - b[dimension]) ** 2
_sum += difference_sq
return sqrt(_sum)


def generate_k(data_set, k):
"""
Given `data_set`, which is an array of arrays,
find the minimum and maximum for each coordinate, a range.
Generate `k` random points between the ranges.
Return an array of the random points within the ranges.
"""
centers = []
dimensions = len(data_set[0])
min_max = defaultdict(int)

for point in data_set:
for i in xrange(dimensions):
val = point[i]
min_key = 'min_%d' % i
max_key = 'max_%d' % i
if min_key not in min_max or val < min_max[min_key]:
min_max[min_key] = val
if max_key not in min_max or val > min_max[max_key]:
min_max[max_key] = val

for _k in xrange(k):
rand_point = []
for i in xrange(dimensions):
min_val = min_max['min_%d' % i]
max_val = min_max['max_%d' % i]

rand_point.append(uniform(min_val, max_val))

centers.append(rand_point)

return centers


def k_means(dataset, k):
k_points = generate_k(dataset, k)
assignments = assign_points(dataset, k_points)
old_assignments = None
while assignments != old_assignments:
new_centers = update_centers(dataset, assignments)
old_assignments = assignments
assignments = assign_points(dataset, new_centers)
return zip(assignments, dataset)


# points = [
# [1, 2],
# [2, 1],
# [3, 1],
# [5, 4],
# [5, 5],
# [6, 5],
# [10, 8],
# [7, 9],
# [11, 5],
# [14, 9],
# [14, 14],
# ]
# print k_means(points, 3)
64 changes: 64 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pytest

from kmeans import (
point_avg,
distance,
generate_k,
assign_points,
update_centers
)


class TestKMeans(object):

def pytest_funcarg__two_dimensional_points(self, request):
return ((0, 10), (5, 15))

def pytest_funcarg__three_dimensional_points(self, request):
return ((1, 2, 6), (2, 3, 8), (4, 5, 3), (6, 7, 0))

def pytest_funcarg__data_set(self, request):
return ((0, 1), (1, 2), (1, 3), (10, 9), (11, 8), (9, 7))

def pytest_funcarg__assignments(self, request):
return [0, 0, 0, 1, 1, 1]

def pytest_funcarg__centers(self, request):
return [[.67, 2.0], [10.0, 8.0]]

def test_point_avg(self, two_dimensional_points, three_dimensional_points):
assert point_avg(two_dimensional_points) == [2.5, 12.5]
assert point_avg(three_dimensional_points) == [3.25, 4.25, 4.25]

def test_distance(self, two_dimensional_points, three_dimensional_points):
assert distance(*two_dimensional_points) > 7.06
assert distance(*two_dimensional_points) < 7.08

def test_generate_k_two_dimensions(self, two_dimensional_points):
for i in xrange(1000):
centers = generate_k(two_dimensional_points, 10)
for point in centers:
assert 0 < point[0] < 10
assert 5 < point[1] < 15

def test_generate_k_three_dimensions(self, three_dimensional_points):
for i in xrange(1000):
centers = generate_k(three_dimensional_points, 10)
for point in centers:
assert 1 < point[0] < 6
assert 2 < point[1] < 7
assert 0 < point[1] < 8

def test_update_centers(self, data_set, assignments):
centers = update_centers(data_set, assignments)
assert len(centers) == 2

rounded_centers = []
for point in centers:
rounded_centers.append([float('%.2f' % point[0]), float('%.2f' % point[1])])
for point in [[.67, 2.0], [10.0, 8.0]]:
assert point in rounded_centers

def test_assign_points(self, data_set, centers, assignments):
assert assign_points(data_set, centers) == assignments

0 comments on commit a059d5e

Please sign in to comment.