Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass store port correctly to spawned actors #186

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions demos/minimal/actors/sample_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def runStep(self):

frame = None
try:
frame = self.q_in.get(timeout=0.001)
frame = self.q_in.get(timeout=0.05)

except Exception:
logger.error("Could not get frame!")
Expand All @@ -59,9 +59,9 @@ def runStep(self):
self.frame = self.client.get(frame)
avg = np.mean(self.frame[0])

# logger.info(f"Average: {avg}")
logger.info(f"Average: {avg}")
self.avg_list.append(avg)
# logger.info(f"Overall Average: {np.mean(self.avg_list)}")
# logger.info(f"Frame number: {self.frame_num}")
logger.info(f"Overall Average: {np.mean(self.avg_list)}")
logger.info(f"Frame number: {self.frame_num}")

self.frame_num += 1
12 changes: 7 additions & 5 deletions demos/minimal/actors/sample_spawn_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from improv.actor import Actor, RunManager
from improv.actor import Actor
import numpy as np
from queue import Empty
import logging
Expand Down Expand Up @@ -52,7 +52,7 @@ def runStep(self):
frame = self.q_in.get(timeout=0.05)
except Empty:
pass
except:
except Exception:
logger.error("Could not get frame!")
pass

Expand All @@ -63,8 +63,10 @@ def runStep(self):
else:
self.frame = self.client.get(frame)
avg = np.mean(self.frame[0])
print(f"Average: {avg}")

logger.info(f"Average: {avg}")
self.avg_list.append(avg)
print(f"Overall Average: {np.mean(self.avg_list)}")
print(f"Frame number: {self.frame_num}")
logger.info(f"Overall Average: {np.mean(self.avg_list)}")
logger.info(f"Frame number: {self.frame_num}")

self.frame_num += 1
3 changes: 3 additions & 0 deletions demos/minimal/minimal_spawn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ actors:

connections:
Generator.q_out: [Processor.q_in]

redis_config:
port: 6378
4 changes: 2 additions & 2 deletions demos/sample_actors/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
class MeanAnalysis(Actor):
# TODO: Add additional error handling
# TODO: this is too complex for a sample actor?
def __init__(self, *args):
super().__init__(*args)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def setup(self, param_file=None):
"""Set custom parameters here
Expand Down
4 changes: 2 additions & 2 deletions demos/sample_actors/analysis_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class AnalysisAsync(Analysis):

"""

def __init__(self, *args):
super().__init__(*args)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.frame_number = 0
self.aqueue = None
Expand Down
4 changes: 2 additions & 2 deletions demos/sample_actors/analysis_julia.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class JuliaAnalysis(Actor):
This actor puts in the data store the average frame intensity.
"""

def __init__(self, *args, julia_file="julia_func.jl"):
def __init__(self, *args, julia_file="julia_func.jl", **kwargs):
"""julia_file: path to .jl file(s) for analyses computed in Julia"""

super().__init__(*args)
super().__init__(*args, **kwargs)

self.julia = None
self.julia_file = julia_file
Expand Down
75 changes: 54 additions & 21 deletions demos/sample_actors/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ class CaimanProcessor(Actor):
"""

def __init__(
self, *args, init_filename="data/Tolias_mesoscope_2.hdf5", config_file=None
self,
*args,
init_filename="data/Tolias_mesoscope_2.hdf5",
config_file=None,
**kwargs,
):
super().__init__(*args)
super().__init__(*args, **kwargs)
logger.info("initfile {}, config file {}".format(init_filename, config_file))
self.param_file = config_file
self.init_filename = init_filename
Expand Down Expand Up @@ -73,7 +77,9 @@ def setup(self):
self.counter = 0

def stop(self):
print("Processor broke, avg time per frame: ", np.mean(self.total_times, axis=0))
print(
"Processor broke, avg time per frame: ", np.mean(self.total_times, axis=0)
)
print("Processor got through ", self.frame_number, " frames")
np.savetxt("output/timing/process_frame_time.txt", np.array(self.total_times))
np.savetxt("output/timing/process_timestamp.txt", np.array(self.timestamp))
Expand All @@ -85,19 +91,26 @@ def stop(self):
np.savetxt("output/timing/shape_time.txt", self.shape_time)
np.savetxt("output/timing/detect_time.txt", self.detect_time)

np.savetxt("output/timing/putAnalysis_time.txt", np.array(self.putAnalysis_time))
np.savetxt(
"output/timing/putAnalysis_time.txt", np.array(self.putAnalysis_time)
)
np.savetxt("output/timing/procFrame_time.txt", np.array(self.procFrame_time))

print("Number of times coords updated ", self.counter)

if self.onAc.estimates.OASISinstances is not None:
try:
init = self.params["init_batch"]
S = np.stack([osi.s[init:] for osi in self.onAc.estimates.OASISinstances])
S = np.stack(
[osi.s[init:] for osi in self.onAc.estimates.OASISinstances]
)
np.savetxt("output/end_spikes.txt", S)
except Exception as e:
logger.error("Exception {}: {} during frame number {}"
.format(type(e).__name__, e, self.frame_number))
logger.error(
"Exception {}: {} during frame number {}".format(
type(e).__name__, e, self.frame_number
)
)
print(traceback.format_exc())
else:
print("No OASIS")
Expand Down Expand Up @@ -125,22 +138,30 @@ def runStep(self):
try:
self.frame = self.client.getID(frame[0][str(self.frame_number)])
t2 = time.time()
self._fitFrame(self.frame_number + init, self.frame.reshape(-1, order="F"))
self._fitFrame(
self.frame_number + init, self.frame.reshape(-1, order="F")
)
self.fitframe_time.append([time.time() - t2])
self.putEstimates()
self.timestamp.append([time.time(), self.frame_number])
except ObjectNotFoundError:
logger.error("Processor: Frame {} unavailable from store, droppping"
.format(self.frame_number))
logger.error(
"Processor: Frame {} unavailable from store, droppping".format(
self.frame_number
)
)
self.dropped_frames.append(self.frame_number)
self.q_out.put([1])
except KeyError as e:
logger.error("Processor: Key error... {0}".format(e))
# Proceed at all costs
self.dropped_frames.append(self.frame_number)
except Exception as e:
logger.error("Processor error: {}: {} during frame number {}"
.format(type(e).__name__, e, self.frame_number))
logger.error(
"Processor error: {}: {} during frame number {}".format(
type(e).__name__, e, self.frame_number
)
)
print(traceback.format_exc())
self.dropped_frames.append(self.frame_number)
self.frame_number += 1
Expand Down Expand Up @@ -207,9 +228,11 @@ def putEstimates(self):
t = time.time()
nb = self.onAc.params.get("init", "nb")
A = self.onAc.estimates.Ab[:, nb:]
before = self.params["init_batch"]
before = self.params["init_batch"]
# self.frame_number-500 if self.frame_number > 500 else 0
C = self.onAc.estimates.C_on[nb : self.onAc.M, before : self.frame_number + before] # .get_ordered()
C = self.onAc.estimates.C_on[
nb : self.onAc.M, before : self.frame_number + before
] # .get_ordered()
t2 = time.time()
t3 = time.time()

Expand All @@ -232,7 +255,9 @@ def putEstimates(self):

# self.q_comm.put([self.frame_number])

self.putAnalysis_time.append([time.time() - t, t2 - t, t3 - t2, t4 - t3, t5 - t4, t6 - t5])
self.putAnalysis_time.append(
[time.time() - t, t2 - t, t3 - t2, t4 - t3, t5 - t4, t6 - t5]
)

def _checkFrames(self):
"""Check to see if we have frames for processing"""
Expand Down Expand Up @@ -281,13 +306,21 @@ def makeImage(self):
try:
# components = self.onAc.estimates.Ab[:,mn:].dot(self.onAc.estimates.C_on[mn:self.onAc.M,(self.frame_number-1)%self.onAc.window]).reshape(self.onAc.dims, order='F')
# background = self.onAc.estimates.Ab[:,:mn].dot(self.onAc.estimates.C_on[:mn,(self.frame_number-1)%self.onAc.window]).reshape(self.onAc.dims, order='F')
components = (self.onAc.estimates.Ab[:, mn:]
.dot(self.onAc.estimates.C_on[mn : self.onAc.M, (self.frame_number - 1)])
.reshape(self.onAc.dims, order="F"))
background = (self.onAc.estimates.Ab[:, :mn]
components = (
self.onAc.estimates.Ab[:, mn:]
.dot(
self.onAc.estimates.C_on[mn : self.onAc.M, (self.frame_number - 1)]
)
.reshape(self.onAc.dims, order="F")
)
background = (
self.onAc.estimates.Ab[:, :mn]
.dot(self.onAc.estimates.C_on[:mn, (self.frame_number - 1)])
.reshape(self.onAc.dims, order="F"))
image = ((components + background) - self.onAc.bnd_Y[0]) / np.diff(self.onAc.bnd_Y)
.reshape(self.onAc.dims, order="F")
)
image = ((components + background) - self.onAc.bnd_Y[0]) / np.diff(
self.onAc.bnd_Y
)
image = np.minimum((image * 255.0), 255).astype("u1")
except ValueError as ve:
logger.info("ValueError: {0}".format(ve))
Expand Down
4 changes: 2 additions & 2 deletions demos/sample_actors/simple_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

class SimpleAnalysis(Actor):
# TODO: Add additional error handling
def __init__(self, *args):
super().__init__(*args)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def setup(self, param_file=None):
"""Set custom parameters here
Expand Down
45 changes: 30 additions & 15 deletions demos/sample_actors/zmqActor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@
import time

import zmq
from zmq import PUB, SUB, SUBSCRIBE, REQ, REP, LINGER, Again, NOBLOCK, ZMQError, EAGAIN, ETERM
from zmq import (
PUB,
SUB,
SUBSCRIBE,
REQ,
REP,
LINGER,
Again,
NOBLOCK,
ZMQError,
EAGAIN,
ETERM,
)
from zmq.log.handlers import PUBHandler
import zmq.asyncio

Expand All @@ -18,12 +30,14 @@ class ZmqActor(Actor):
"""
Zmq actor with pub/sub or rep/req pattern.
"""
def __init__(self, *args, type='PUB', ip='127.0.0.1', port=5555, **kwargs):

def __init__(self, *args, type="PUB", ip="127.0.0.1", port=5555, **kwargs):
super().__init__(*args, **kwargs)
logger.info("Constructed Zmq Actor")
if str(type) in 'PUB' or str(type) in 'SUB':
self.pub_sub_flag = True #default
else: self.pub_sub_flag = False
if str(type) in "PUB" or str(type) in "SUB":
self.pub_sub_flag = True # default
else:
self.pub_sub_flag = False
self.rep_req_flag = not self.pub_sub_flag
self.ip = ip
self.port = port
Expand All @@ -40,14 +54,14 @@ def sendMsg(self, msg, msg_type="pyobj"):
"""
Sends a message to the controller.
"""
if not self.send_socket:
if not self.send_socket:
self.setSendSocket()

if msg_type == "multipart":
self.send_socket.send_multipart(msg)
if msg_type == "pyobj":
self.send_socket.send_pyobj(msg)
elif msg_type == "single":
elif msg_type == "single":
self.send_socket.send(msg)

def recvMsg(self, msg_type="pyobj", flags=0):
Expand All @@ -56,15 +70,16 @@ def recvMsg(self, msg_type="pyobj", flags=0):

NOTE: default flag=0 instead of flag=NOBLOCK
"""
if not self.recv_socket: self.setRecvSocket()

if not self.recv_socket:
self.setRecvSocket()

while True:
try:
if msg_type == "multipart":
recv_msg = self.recv_socket.recv_multipart(flags=flags)
elif msg_type == "pyobj":
recv_msg = self.recv_socket.recv_pyobj(flags=flags)
elif msg_type == "single":
elif msg_type == "single":
recv_msg = self.recv_socket.recv(flags=flags)
break
except Again:
Expand Down Expand Up @@ -134,29 +149,29 @@ def replyMsg(self, reply, delay=0.0001):
self.setRepSocket()

msg = self.rep_socket.recv_pyobj()
time.sleep(delay)
time.sleep(delay)
self.rep_socket.send_pyobj(reply)
self.rep_socket.close()

return msg

def put(self, msg=None):
logger.debug(f'Putting message {msg}')
logger.debug(f"Putting message {msg}")
if self.pub_sub_flag:
logger.debug(f"putting message {msg} using pub/sub")
return self.sendMsg(msg)
else:
logger.debug(f"putting message {msg} using rep/req")
return self.requestMsg(msg)

def get(self, reply=None):
if self.pub_sub_flag:
logger.debug(f"getting message with pub/sub")
return self.recvMsg()
else:
logger.debug(f"getting message using reply {reply} with pub/sub")
return self.replyMsg(reply)

def setSendSocket(self, timeout=1.001):
"""
Sets up the send socket for the actor.
Expand All @@ -173,7 +188,7 @@ def setRecvSocket(self, timeout=1.001):
self.recv_socket.connect(self.address)
self.recv_socket.setsockopt(SUBSCRIBE, b"")
time.sleep(timeout)

def setReqSocket(self, timeout=0.0001):
"""
Sets up the request socket for the actor.
Expand Down
Loading
Loading