forked from microsoft/autogen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_async.py
162 lines (145 loc) · 6.01 KB
/
test_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import pytest
import asyncio
import autogen
from test_assistant_agent import KEY_LOC, OAI_CONFIG_LIST
def get_market_news(ind, ind_upper):
data = {
"feed": [
{
"title": "Palantir CEO Says Our Generation's Atomic Bomb Could Be AI Weapon - And Arrive Sooner Than You Think - Palantir Technologies ( NYSE:PLTR ) ",
"summary": "Christopher Nolan's blockbuster movie \"Oppenheimer\" has reignited the public discourse surrounding the United States' use of an atomic bomb on Japan at the end of World War II.",
"overall_sentiment_score": 0.009687,
},
{
"title": '3 "Hedge Fund Hotels" Pulling into Support',
"summary": "Institutional quality stocks have several benefits including high-liquidity, low beta, and a long runway. Strategist Andrew Rocco breaks down what investors should look for and pitches 3 ideas.",
"banner_image": "https://staticx-tuner.zacks.com/images/articles/main/92/87.jpg",
"overall_sentiment_score": 0.219747,
},
{
"title": "PDFgear, Bringing a Completely-Free PDF Text Editing Feature",
"summary": "LOS ANGELES, July 26, 2023 /PRNewswire/ -- PDFgear, a leading provider of PDF solutions, announced a piece of exciting news for everyone who works extensively with PDF documents.",
"overall_sentiment_score": 0.360071,
},
{
"title": "Researchers Pitch 'Immunizing' Images Against Deepfake Manipulation",
"summary": "A team at MIT says injecting tiny disruptive bits of code can cause distorted deepfake images.",
"overall_sentiment_score": -0.026894,
},
{
"title": "Nvidia wins again - plus two more takeaways from this week's mega-cap earnings",
"summary": "We made some key conclusions combing through quarterly results for Microsoft and Alphabet and listening to their conference calls with investors.",
"overall_sentiment_score": 0.235177,
},
]
}
feeds = data["feed"][ind:ind_upper]
feeds_summary = "\n".join(
[
f"News summary: {f['title']}. {f['summary']} overall_sentiment_score: {f['overall_sentiment_score']}"
for f in feeds
]
)
return feeds_summary
@pytest.mark.asyncio
async def test_async_groupchat():
try:
import openai
except ImportError:
return
config_list = autogen.config_list_from_json(OAI_CONFIG_LIST, KEY_LOC)
llm_config = {
"timeout": 600,
"cache_seed": 41,
"config_list": config_list,
"temperature": 0,
}
# create an AssistantAgent instance named "assistant"
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"timeout": 600,
"cache_seed": 41,
"config_list": config_list,
"temperature": 0,
},
system_message="You are a helpful assistant. Reply 'TERMINATE' to end the conversation.",
)
# create a UserProxyAgent instance named "user"
user_proxy = autogen.UserProxyAgent(
name="user",
human_input_mode="NEVER",
max_consecutive_auto_reply=5,
code_execution_config=False,
default_auto_reply=None,
)
groupchat = autogen.GroupChat(agents=[user_proxy, assistant], messages=[], max_round=12)
manager = autogen.GroupChatManager(
groupchat=groupchat,
llm_config=llm_config,
is_termination_msg=lambda x: "TERMINATE" in x.get("content", ""),
)
await user_proxy.a_initiate_chat(manager, message="""Have a short conversation with the assistant.""")
assert len(user_proxy.chat_messages) > 0
@pytest.mark.asyncio
async def test_stream():
try:
import openai
except ImportError:
return
config_list = autogen.config_list_from_json(OAI_CONFIG_LIST, KEY_LOC)
data = asyncio.Future()
async def add_stock_price_data():
# simulating the data stream
for i in range(0, 2, 1):
latest_news = get_market_news(i, i + 1)
if data.done():
data.result().append(latest_news)
else:
data.set_result([latest_news])
# print(data.result())
await asyncio.sleep(5)
data_task = asyncio.create_task(add_stock_price_data())
# create an AssistantAgent instance named "assistant"
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"timeout": 600,
"cache_seed": 41,
"config_list": config_list,
"temperature": 0,
},
system_message="You are a financial expert.",
)
# create a UserProxyAgent instance named "user"
user_proxy = autogen.UserProxyAgent(
name="user",
human_input_mode="NEVER",
max_consecutive_auto_reply=5,
code_execution_config=False,
default_auto_reply=None,
)
async def add_data_reply(recipient, messages, sender, config):
await asyncio.sleep(0.1)
data = config["news_stream"]
if data.done():
result = data.result()
if result:
news_str = "\n".join(result)
result.clear()
return (
True,
f"Just got some latest market news. Merge your new suggestion with previous ones.\n{news_str}",
)
return False, None
user_proxy.register_reply(autogen.AssistantAgent, add_data_reply, position=2, config={"news_stream": data})
await user_proxy.a_initiate_chat(
assistant,
message="""Give me investment suggestion in 3 bullet points.""",
)
while not data_task.done() and not data_task.cancelled():
reply = await user_proxy.a_generate_reply(sender=assistant)
if reply is not None:
await user_proxy.a_send(reply, assistant)
if __name__ == "__main__":
asyncio.run(test_stream())