-
Notifications
You must be signed in to change notification settings - Fork 1
/
net_wol.py
149 lines (111 loc) · 4.37 KB
/
net_wol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import socket
import network
import utime
import credentials_cache
import device_manager
import html_cache
import http_utils
import status_light
# Connects to local network, returns IPv4 of device
def connect():
# Takes first and second line of credentials file and takes the ssid and password from them
ssid = credentials_cache.get_ssid()
password = credentials_cache.get_inet_password()
# Connect to the network using ssid and password
wlan = network.WLAN(network.STA_IF)
# Disable power save
wlan.config(pm=0xa11140)
wlan.active(True)
wlan.connect(str(ssid), str(password))
# Make sure that it gets connected to the network
while not wlan.isconnected():
utime.sleep(1)
status_light.send_blinks(1)
return str(wlan.ifconfig()[0])
# Opens socket to listen for http requests
def listen(ip):
# Format address
address = (ip, 80)
# Bind and listen on port for http requests
connection = socket.socket()
# Make sure that a connection can not block
connection.settimeout(30)
connection.bind(address)
connection.listen(1)
return (connection)
# Creates broadcast socket to be used for sending wake-on-lan packets
def establish_wol_socket():
wol_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return wol_socket
# Checks if an html form button was pressed and updates the html page
def serve(connection, wol_socket):
# Initialize reference to network interface to check if connected to network
wlan = network.WLAN(network.STA_IF)
# Initialize system timer variables
first_enable = True
first_enable_timestamp = -1
while wlan.isconnected():
# Check every refresh loop if the device has been enabled and log start-time
device_on = device_manager.get_status(credentials_cache.get_target_ip())
if (device_on):
if first_enable:
first_enable_timestamp = utime.time()
first_enable = False
else:
first_enable = True
try:
# Accept client connection and reads request
client = connection.accept()[0]
except OSError:
# Refreshes the listener on the timeout interval to clear invalid requests
# Update device up-timer between listening refreshes
device_manager.get_status(credentials_cache.get_target_ip())
status_light.send_blinks(1)
continue
try:
request = str(client.recv(1024))
except OSError:
# If request could not be parsed, close connection and continue from top
status_light.send_blinks(2)
client.close()
continue
send_status, post_body_found = http_utils.parse_post(request, wol_socket, override_post_check=False)
# If password fields could not be found, body was not send along with the header, need to receive again
if not post_body_found:
status_light.send_blinks(1)
try:
# Receive body
request = str(client.recv(1024))
except OSError:
# Error in accepting request
status_light.send_blinks(2)
client.close()
continue
# Retry process, but if it fails do not try again
send_status = http_utils.parse_post(request, wol_socket, override_post_check=True)[0]
# Generate and send new html page
if device_on:
if first_enable:
# Reset time of first enable
first_enable_timestamp = utime.time()
# Find device runtime
device_runtime = utime.time() - first_enable_timestamp
html = html_cache.get_html("on", device_runtime, send_status)
# Reset first_enable status when enabled for first time
first_enable = False
else:
html = html_cache.get_html("off", -1, send_status)
# Reset first_enable status when device powers off
first_enable = True
try:
client.send(html)
except OSError:
# Error in sending request
status_light.send_blinks(2)
client.close()
continue
# Wait for html to be finished sending
utime.sleep(1)
client.close()
# Send status that request was fulfilled
status_light.send_blinks(3)