forked from hankcs/hmm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhmm_mp.py
146 lines (134 loc) · 4.59 KB
/
hmm_mp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import numpy as np
#import ghmm # not used now
import multiprocessing
import hmm
import logging
class MultiProcessHMM(hmm.HMM):
"""Implementation of HMM with multiprocessing.
Using multiprocessing module to fasten calculation of estimation
step."""
def __init__(self, t, e, i, worker_num=2):
"""Constructer."""
hmm.HMM.__init__(self, t, e, i)
self.worker_num = worker_num
def baum_welch(self,
observations,
iter_limit=1000,
threshold=1e-5,
pseudocounts=[0, 0, 0],
worker_num=None):
"""Perform Baum-Welch algorithm.
Require a list of observations."""
worker_num = worker_num if worker_num is not None else self.worker_num
x_digits = [ np.array(
[[1 if x[n] == i else 0 for i in xrange(self._M)]
for n in xrange(len(x))] ).T
for x in observations]
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
workers = [Worker(i, tasks, results) for i in xrange(worker_num)]
for w in workers:
w.start()
logging.info("Starting process %d...", w.id_num)
l_prev = 0
for n in xrange(iter_limit):
for i in xrange(len(observations)):
tasks.put(Estimator(observations[i], self._t, self._e, self._i, i))
tasks.join()
estimations = {}
for i in xrange(len(observations)):
estimations.update(results.get())
gammas, xisums, cs = np.array(
[estimations[i] for i in xrange(len(observations))]
).T
### do something
l = self.maximize(gammas, xisums, cs, x_digits)
if hmm.has_positive(pseudocounts):
self.add_pseudocounts(pseudocounts)
dif = l - l_prev
print n, l, dif
l_prev = l
if n > 0 and dif < threshold:
for i in xrange(worker_num):
tasks.put(None)
break
class Worker(multiprocessing.Process):
"""Tasks for estimation step."""
def __init__(self, number, tasks, results):
"""Requires a list of observations."""
multiprocessing.Process.__init__(self)
self.id_num = number
self.tasks = tasks
self.results = results
print "Process (%d) has been made." % self.id_num
def run(self):
"""Run calculation."""
while True:
next_task = self.tasks.get()
if next_task is None:
print "%d exiting..." % self.id_num
self.tasks.task_done()
break
print "Proccess (%d) calculating estimations..." % self.id_num
estimation = next_task()
self.tasks.task_done()
self.results.put(estimation)
return
class Estimator(object):
def __init__(self, x, t, e, i, seq_number):
self.x = x
self.t = t
self.e = e
self.i = i
self.seq_number = seq_number
def __call__(self):
"""Calculate alpha
"""
x = self.x
t = self.t
e = self.e
i = self.i
N = len(x)
K = len(t[0])
# \hat{alpha}: p(z_n | x_1, ..., x_n)
alpha = np.zeros([N, K], float)
alpha[0] = i * e[x[0]]
alpha[0] /= alpha[0].sum()
beta = np.zeros([N, K], float)
beta[-1] = 1.0
c = np.zeros([N], float)
c[0] = alpha[0].sum()
# Calculate Alpha
for n in xrange(1, N):
a = e[x[n]] * np.dot(alpha[n -1], t)
c[n] = a.sum()
alpha[n] = a / c[n]
# Calculate Beta
for n in xrange(N - 2, -1, -1):
beta[n] = np.dot(beta[n + 1] * e[x[n + 1]], t.T) / c[n + 1]
gamma = alpha * beta
xisum = sum(
np.outer(alpha[n-1], e[x[n]] * beta[n]) / c[n] for n in xrange(1, N)
) * t
return {self.seq_number: [gamma, xisum, c]}
def split_data(x, num):
"""Split the data set x into num subsets."""
data_num = len(x)
num_per_subset = data_num / num
frac = data_num % num
subsets = {}
splitted = 0
for i in xrange(num):
start = splitted
end = start + num_per_subset
if frac > 0:
end += 1
frac -= 1
splitted = end
subsets[i] = x[start:end+1]
return subsets
def merge_results(xs, num):
"""Merge estimation into one numpy.array object."""
return np.array([xs[i] for i in xrange(num)]).T