Skip to content

Commit

Permalink
修复工作流中多次返回导致失败bug
Browse files Browse the repository at this point in the history
  • Loading branch information
mengangkeji committed Aug 1, 2024
1 parent 692e4a7 commit 77ab4fd
Showing 1 changed file with 41 additions and 44 deletions.
85 changes: 41 additions & 44 deletions wss.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import websockets
import threading
from .public import get_output, write_json_to_file, read_json_from_file, get_address, get_port, \
generate_unique_client_id, get_port_from_cmdline, args, find_project_root, get_token,get_workflow

generate_unique_client_id, get_port_from_cmdline, args, find_project_root, get_token, get_workflow
SERVER_1_URI = "wss://tt.9syun.com/wss"
ADDRESS = get_address()
PORT = get_port_from_cmdline()
Expand Down Expand Up @@ -128,17 +127,22 @@ async def getHistoryPrompt(prompt_id, type_a=''):
result = result[prompt_id]
status = result.get('status', {})
if status.get('completed', False):
file_num = 0
result_data.append({"type": "str", "k": 'ok', "v": '1'})
for output in result.get('outputs', {}).values():
for media in ['images', 'gifs', 'videos']:
if media in output:
for item in output[media]:
if 'filename' in item:
if item['subfolder'] != '':
item['filename'] = item['subfolder'] + '/' + item['filename']
if item['subfolder'] != '' and item['type'] == 'output':
item['filename'] = item['subfolder'] + '/' + item['filename']
file_num += 1
result_data.append({"type": 'images', "k": 'file', "v": (
args.output_directory if args.output_directory else find_project_root() + 'output') + '/' +
item['filename']})
if file_num == 0:
result_data.append({"type": "str", "k": 'ok', "v": '0', 'text': '制作失败'})
pass
else:
result_data.append({"type": "str", "k": 'ok', "v": '0', 'text': 'completed状态不对'})
else:
Expand Down Expand Up @@ -229,7 +233,7 @@ async def server1_receive_messages(websocket, message_type, message_json):
output = get_output(uniqueid + '.json')
workflow = get_workflow(uniqueid + '.json')
if output:
executor.submit(run_async_task, output, prompt_data, workflow,jilu_id)
executor.submit(run_async_task, output, prompt_data, workflow, jilu_id)
else:
if websocket.open:
websocket_queue.append({
Expand All @@ -255,23 +259,25 @@ def optimized_process_history_data(history_data_1):
pending = sorted(queue_pending, key=lambda x: int(x[0]))
pending = [item[1] for item in pending]
return running, pending


async def getMessageHistoryPrompt(result,prompt_id):
async def getMessageHistoryPrompt(result, prompt_id):
result_data = [{"type": "str", "k": 'prompt_id', "v": prompt_id}]
response_status = None
try:
file_num = 0
result_data.append({"type": "str", "k": 'ok', "v": '1'})
for media in ['images', 'gifs', 'videos']:
if media in result['output']:
for item in result['output'][media]:
if 'filename' in item:
if 'filename' in item and item['type'] == 'output':
if item['subfolder'] != '':
item['filename'] = item['subfolder'] + '/' + item['filename']
item['filename'] = item['subfolder'] + '/' + item['filename']
file_num += 1
result_data.append({"type": 'images', "k": 'file', "v": (
args.output_directory if args.output_directory else find_project_root() + 'output') + '/' +
item['filename']})

if file_num == 0:
return
pass
except Exception as e:
print_exception_in_chinese(e)
result_data.append({"type": "str", "k": 'ok', "v": '0', 'text': '异常的信息'})
Expand All @@ -294,8 +300,6 @@ async def getMessageHistoryPrompt(result,prompt_id):
await session.close()
return {'status': response_status,
'message': '操作完成.' if response_status == 200 else '发生错误.'}


async def server2_receive_messages(websocket, message_type, message_json):
global send_time
if message_type and message_type != 'crystools.monitor':
Expand Down Expand Up @@ -327,7 +331,7 @@ async def server2_receive_messages(websocket, message_type, message_json):
'type': 'send',
'prompt_id': message_json['data']['prompt_id'],
})
await getMessageHistoryPrompt(message_json['data'],message_json['data']['prompt_id'])
await getMessageHistoryPrompt(message_json['data'], message_json['data']['prompt_id'])
pass
if message_type == 'progress':
pass
Expand Down Expand Up @@ -448,14 +452,13 @@ def generate_md5_uid_timestamp_filename(original_filename):
filename = md5_hash + file_extension
return filename
async def loca_download_image(url, filename):
http_proxy = os.environ.get('http_proxy')
https_proxy = os.environ.get('https_proxy')
no_proxy = os.environ.get('no_proxy')

http_proxy = os.environ.get('http_proxy', '')
https_proxy = os.environ.get('https_proxy', '')
no_proxy = os.environ.get('no_proxy', '*')
os.environ['http_proxy'] = ''
os.environ['https_proxy'] = ''
os.environ['no_proxy'] = '*'


dir_name = find_project_root() + 'input'
no_proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(no_proxy_handler)
Expand All @@ -464,49 +467,43 @@ async def loca_download_image(url, filename):
response = opener.open(url)
if response.getcode() == 200:
full_path = os.path.join(dir_name, file_new_name)
if os.path.exists(full_path):
if http_proxy is not None:
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
if os.path.exists(full_path):
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
return {
'code': True,
'filename': file_new_name,
}
with open(full_path, 'wb') as f:
f.write(response.read())


if http_proxy is not None:
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
return {
'code': True,
'filename': file_new_name,
}
else:
if http_proxy is not None:
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
return {
'code': False,
'filename': file_new_name,
}
except Exception as e:
if http_proxy is not None:
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
os.environ['http_proxy'] = http_proxy
os.environ['https_proxy'] = https_proxy
os.environ['no_proxy'] = no_proxy
return {
'code': False,
'filename': file_new_name,
}
def generate_large_random_number(num_bits):

return random.getrandbits(num_bits)
def queue_prompt(prompt,workflow, new_client_id):
def queue_prompt(prompt, workflow, new_client_id):
try:
if websocket_conn2 is not None and websocket_conn2.open:
p = {
Expand All @@ -522,7 +519,7 @@ def queue_prompt(prompt,workflow, new_client_id):
except Exception as e:
print_exception_in_chinese(e)
return {}
async def process_json_elements(json_data, prompt_data,workflow, jilu_id):
async def process_json_elements(json_data, prompt_data, workflow, jilu_id):
global websocket_conn1
try:
if 'cs_imgs' in prompt_data and prompt_data['cs_imgs']:
Expand Down Expand Up @@ -625,7 +622,7 @@ async def print_item(key, value):
'code': 1,
'prompt_id': result['prompt_id']
}
else:
else:
raise Exception('发送指令失败')
except Exception as e:
print_exception_in_chinese(e)
Expand All @@ -643,8 +640,8 @@ async def print_item(key, value):
'code': 0,
'prompt_id': jilu_id
}
def run_async_task(json_data, prompt_data,workflow, jilu_id):
return asyncio.run(process_json_elements(json_data, prompt_data, workflow,jilu_id))
def run_async_task(json_data, prompt_data, workflow, jilu_id):
return asyncio.run(process_json_elements(json_data, prompt_data, workflow, jilu_id))
def run_async_task2(prompt_id):
asyncio.run(getHistoryPrompt(prompt_id))
def task_3():
Expand Down Expand Up @@ -759,4 +756,4 @@ def thread_run():
threading.Thread(target=task5_thread).start()
executor.submit(run_task_in_loop, task_4)
async def update_worker_flow(uniqueid, data, flow_type='api/'):
write_json_to_file(data, uniqueid + '.json', 'json/'+flow_type, 'json')
write_json_to_file(data, uniqueid + '.json', 'json/' + flow_type, 'json')

0 comments on commit 77ab4fd

Please sign in to comment.