forked from vert-x3/vertx-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.rb
101 lines (73 loc) · 3.18 KB
/
server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
require 'vertx-mqtt-server/mqtt_server'
require 'vertx/buffer'
mqttServer = VertxMqttServer::MqttServer.create($vertx)
mqttServer.endpoint_handler() { |endpoint|
# shows main connect info
puts "MQTT client [#{endpoint.client_identifier()}] request to connect, clean session = #{endpoint.clean_session?()}"
if (endpoint.auth() != nil)
puts "[username = #{endpoint.auth().user_name()}, password = #{endpoint.auth().password()}]"
end
if (endpoint.will() != nil)
puts "[will flag = #{endpoint.will().will_flag?()} topic = #{endpoint.will().will_topic()} msg = #{endpoint.will().will_message()} QoS = #{endpoint.will().will_qos()} isRetain = #{endpoint.will().will_retain?()}]"
end
puts "[keep alive timeout = #{endpoint.keep_alive_time_seconds()}]"
# accept connection from the remote client
endpoint.accept(false)
# handling requests for subscriptions
endpoint.subscribe_handler() { |subscribe|
grantedQosLevels = Array.new
subscribe.topic_subscriptions().each do |s|
puts "Subscription for #{s.topic_name()} with QoS #{s.quality_of_service()}"
grantedQosLevels.push(s.quality_of_service())
end
# ack the subscriptions request
endpoint.subscribe_acknowledge(subscribe.message_id(), grantedQosLevels)
# just as example, publish a message on the first topic with requested QoS
endpoint.publish(subscribe.topic_subscriptions()[0].topic_name(), Vertx::Buffer.buffer("Hello from the Vert.x MQTT server"), subscribe.topic_subscriptions()[0].quality_of_service(), false, false)
# specifing handlers for handling QoS 1 and 2
endpoint.publish_acknowledge_handler() { |messageId|
puts "Received ack for message = #{messageId}"
}.publish_received_handler() { |messageId|
endpoint.publish_release(messageId)
}.publish_completion_handler() { |messageId|
puts "Received ack for message = #{messageId}"
}
}
# handling requests for unsubscriptions
endpoint.unsubscribe_handler() { |unsubscribe|
unsubscribe.topics().each do |t|
puts "Unsubscription for #{t}"
end
# ack the subscriptions request
endpoint.unsubscribe_acknowledge(unsubscribe.message_id())
}
# handling ping from client
endpoint.ping_handler() { |v|
puts "Ping received from client"
}
# handling disconnect message
endpoint.disconnect_handler() { |v|
puts "Received disconnect from client"
}
# handling closing connection
endpoint.close_handler() { |v|
puts "Connection closed"
}
# handling incoming published messages
endpoint.publish_handler() { |message|
puts "Just received message on [#{message.topic_name()}] payload [#{message.payload()}] with QoS [#{message.qos_level()}]"
if (message.qos_level() == :AT_LEAST_ONCE)
endpoint.publish_acknowledge(message.message_id())
elsif (message.qos_level() == :EXACTLY_ONCE)
endpoint.publish_received(message.message_id())
end
}.publish_release_handler() { |messageId|
endpoint.publish_complete(messageId)
}
}.listen(1883, "0.0.0.0") { |ar_err,ar|
if (ar_err == nil)
puts "MQTT server is listening on port #{mqttServer.actual_port()}"
else
STDERR.puts "Error on starting the server#{ar_err.get_message()}"
end
}