Skip to content

Commit

Permalink
Add bidirectional pipe server and client. Fix discovery server and cl…
Browse files Browse the repository at this point in the history
…ient.
  • Loading branch information
Jason Hurt authored and Jason Hurt committed Jan 22, 2010
1 parent 7e31392 commit f583f54
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 71 deletions.
8 changes: 6 additions & 2 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,26 @@
<path location="${source.dir}"/>
</classpath>
<sysproperty key="clojure.compile.path" value="${bin.dir}"/>
<arg value="com.jhurt.Discovery"/>
<arg value="com.jhurt.Math"/>
<arg value="com.jhurt.NNGenerator"/>
<arg value="com.jhurt.Plot3D"/>
<arg value="com.jhurt.SwingUtils"/>
<arg value="com.jhurt.ThreadUtils"/>
<arg value="com.jhurt.UI"/>

<arg value="com.jhurt.p2p.DiscoveryClient"/>
<arg value="com.jhurt.p2p.DiscoveryServer"/>
<arg value="com.jhurt.p2p.Jxta"/>
<arg value="com.jhurt.p2p.PipeClient"/>
<arg value="com.jhurt.p2p.PipeServer"/>

<arg value="com.jhurt.nn.ActivationFunctions"/>
<arg value="com.jhurt.nn.BackPropagation"/>
<arg value="com.jhurt.nn.Clusterer"/>
<arg value="com.jhurt.nn.FirstPrincipalComponent"/>
<arg value="com.jhurt.nn.Input"/>
<arg value="com.jhurt.nn.PerceptronHaykin"/>
<arg value="com.jhurt.nn.PerceptronRojas"/>

</java>
</target>

Expand Down
49 changes: 49 additions & 0 deletions src/com/jhurt/p2p/DiscoveryClient.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
;;Copyright (c) 2010, University of Nevada, Las Vegas
;;All rights reserved.
;;
;;Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
;;
;; * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
;; * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
;; * Neither the name of the University of Nevada, Las Vegas, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
;;
;;THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

(ns com.jhurt.p2p.DiscoveryClient
(:gen-class)
(:use [com.jhurt.p2p.Jxta :as Jxta]))

(import
'(net.jxta.discovery DiscoveryEvent DiscoveryListener DiscoveryService)
'(net.jxta.document Advertisement AdvertisementFactory)
'(net.jxta.peergroup PeerGroup PeerGroupID)
'(net.jxta.protocol DiscoveryResponseMsg PipeAdvertisement)
'(net.jxta.platform NetworkManager NetworkManager$ConfigMode)
'(net.jxta.id IDFactory)
'(net.jxta.pipe PipeService)
'(java.io File)
'(java.util Enumeration))

(def manager (new NetworkManager NetworkManager$ConfigMode/ADHOC "DiscoveryClient"
(.toURI (new File (new File ".cache") "DiscoveryClient"))))

(def advertisementName "SOMEUNIQUENAME")

(defn heartbeat [discoveryService]
(while true
(let [waitTime 6000
pipeAdv (Jxta/getNewPipeAdvertisement advertisementName)]
(println "client publishing adv: " (str pipeAdv))
(.remotePublish discoveryService pipeAdv)
(println "client sleeping for: " waitTime)
(Thread/sleep waitTime))))

(defn startClient [discoveryService]
(println "*************************starting client node*************************\n")
(heartbeat discoveryService))

(defn -main []
(.startNetwork manager)
(let [netPeerGroup (.getNetPeerGroup manager)
discoveryService (.getDiscoveryService netPeerGroup)]
(future (startClient discoveryService))))
105 changes: 36 additions & 69 deletions src/com/jhurt/Discovery.clj → src/com/jhurt/p2p/DiscoveryServer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
;;
;;THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

(ns com.jhurt.Discovery
(:use [com.jhurt.ThreadUtils :as ThreadUtils]))
(ns com.jhurt.p2p.DiscoveryServer
(:gen-class))

(import
'(net.jxta.discovery DiscoveryEvent DiscoveryListener DiscoveryService)
'(net.jxta.document Advertisement AdvertisementFactory)
'(net.jxta.peergroup PeerGroup PeerGroupID)
'(net.jxta.protocol DiscoveryResponseMsg PipeAdvertisement)
'(net.jxta.impl.protocol ModuleImplAdv)
'(net.jxta.platform NetworkManager NetworkManager$ConfigMode)
'(net.jxta.id IDFactory)
'(net.jxta.pipe PipeService)
Expand All @@ -26,82 +27,48 @@
(def manager (new NetworkManager NetworkManager$ConfigMode/ADHOC "DiscoveryServer"
(.toURI (new File (new File ".cache") "DiscoveryServer"))))

(def discoveryService (ref nil))

;called whenever Advertisement responses are received from remote peers by the Discovery Service.
(def defaultDiscoveryListener (proxy [DiscoveryListener] []
(discoveryEvent [#^DiscoveryEvent event]
(let [response (.getReponse event)
(let [response (.getResponse event)
advertisements (.getAdvertisements response)]
(println "server received discovery response from peer " (.getSource event) "\n")
(apply println advertisements)))))

(defn getPipeAdvertisement []
(doto (AdvertisementFactory/newAdvertisement (PipeAdvertisement/getAdvertisementType))
(.setPipeID (IDFactory/newPipeID PeerGroupID/defaultNetPeerGroupID))
(.setType PipeService/UnicastType)
(.setName "Discovery Tutorial")))
(apply
(fn [#^Advertisement adv]
(println "adv type: " (.getAdvType adv))
(println "adv id: " (.getID adv))
(println "adv desc: " (.getDescription adv))
(println "adv code: " (.getCode adv))
(println "adv uri: " (.getUri adv))
(println "adv provider: " (.getProvider adv)))
(enumeration-seq advertisements))))))

(defn serverLoop [discoveryService]
(defn serverLoop [#^DiscoveryService discoveryService]
(while true
(let [pipeAdv (getPipeAdvertisement)
lifetime 30000 expiration 30000 waitTime 31000]
(println "server publising pipeAdv:")
(println (.toString pipeAdv))
(doto discoveryService
(.publish pipeAdv lifetime expiration)
(.remotePublish #^Advertisement pipeAdv)) ;expiration))
(Thread/sleep waitTime))))
(println "server getting remote advertisements\n")
; look for any peer
(.getRemoteAdvertisements discoveryService
; no specific peer (propagate)
nil
; Adv type
DiscoveryService/ADV
; Attribute = name
nil
; Value = the tutorial
nil
; one advertisement response is all we are looking for
1
; no query specific listener. we are using a global listener
nil)
(Thread/sleep 15000)))


(defn startServer [discoveryService]
(println "*************************starting server node*************************\n\n")
(serverLoop discoveryService))

(defn clientLoop [discoveryService]
(while true
(let [waitTime 5000]
(println "client sleeping for: " waitTime)
(Thread/sleep waitTime)
(println "client sending a discovery message")
; look for any peer
(.getRemoteAdvertisements discoveryService
; no specific peer (propagate)
nil
; Adv type
DiscoveryService/ADV
; Attribute = name
"Name"
; Value = the tutorial
"Discovery tutorial"
; one advertisement response is all we are looking for
1
; no query specific listener. we are using a global listener
nil))))

(defn startClient [discoveryService]
(println "*************************starting client node*************************\n")
(.getRemoteAdvertisements discoveryService
; no specific peer
nil
; adv type
DiscoveryService/ADV
; attr = any
nil
; value = any
nil
; one advertisement response is all we are looking for
1
; no query specific listener. we are using a global listener
nil)
(clientLoop discoveryService))

(defn -main [s]
(defn -main []
(.startNetwork manager)
(let [netPeerGroup (.getNetPeerGroup manager)]
(dosync
(ref-set discoveryService (doto (.getDiscoveryService netPeerGroup)
(.addDiscoveryListener defaultDiscoveryListener))))
(future (startServer @discoveryService))
(future (startClient @discoveryService))
(future (startClient @discoveryService))
(read-line)))

(let [netPeerGroup (.getNetPeerGroup manager)
discoveryService (doto (.getDiscoveryService netPeerGroup) (.addDiscoveryListener defaultDiscoveryListener)) ]
(future (startServer discoveryService))))
49 changes: 49 additions & 0 deletions src/com/jhurt/p2p/Jxta.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
;;Copyright (c) 2010, University of Nevada, Las Vegas
;;All rights reserved.
;;
;;Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
;;
;; * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
;; * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
;; * Neither the name of the University of Nevada, Las Vegas, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
;;
;;THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

(ns com.jhurt.p2p.Jxta)

(import
'(net.jxta.discovery DiscoveryEvent DiscoveryListener DiscoveryService)
'(net.jxta.document Advertisement AdvertisementFactory)
'(net.jxta.peergroup PeerGroup PeerGroupID)
'(net.jxta.protocol DiscoveryResponseMsg PipeAdvertisement)
'(net.jxta.platform NetworkManager NetworkManager$ConfigMode)
'(net.jxta.id IDFactory)
'(net.jxta.pipe PipeID PipeService)
'(java.io File)
'(java.util Enumeration)
'(java.net URI))

(def NETWORK_NAME "NNGeneratorNetwork")

(def MESSAGE_NAMESPACE_NAME "NNGenerator")

(def TEST_ELEMENT_NAME "test")
(def HEARTBEAT_ELEMENT_NAME "heartbeat")
(def RESPONSE_ELEMENT_NAME "response")


(def NN_SERVER_PIPE_ID (PipeID/create
(URI/create "urn:jxta:uuid-59616261646162614E504720503250338944BCED387C4A2BBD8E9411B78C284104")))

(defn getPipeAdvertisement []
(let [adv (AdvertisementFactory/newAdvertisement (PipeAdvertisement/getAdvertisementType))]
(doto adv
(.setPipeID NN_SERVER_PIPE_ID)
(.setType PipeService/UnicastType)
(.setName "JxtaBiDiPipe tutorial"))))

(defn getNewPipeAdvertisement [name]
(doto (AdvertisementFactory/newAdvertisement (PipeAdvertisement/getAdvertisementType))
(.setPipeID (IDFactory/newPipeID PeerGroupID/defaultNetPeerGroupID))
(.setType PipeService/UnicastType)
(.setName name)))
70 changes: 70 additions & 0 deletions src/com/jhurt/p2p/PipeClient.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
;;Copyright (c) 2010, University of Nevada, Las Vegas
;;All rights reserved.
;;
;;Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
;;
;; * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
;; * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
;; * Neither the name of the University of Nevada, Las Vegas, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
;;
;;THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

(ns com.jhurt.p2p.PipeClient
(:gen-class)
(:use [com.jhurt.p2p.Jxta :as Jxta]))

(import
'(net.jxta.document AdvertisementFactory)
'(net.jxta.endpoint Message MessageElement StringMessageElement)
'(net.jxta.logging Logging)
'(net.jxta.peergroup PeerGroup)
'(net.jxta.pipe PipeID PipeMsgEvent PipeMsgListener PipeService)
'(net.jxta.protocol PipeAdvertisement)
'(net.jxta.platform NetworkManager NetworkManager$ConfigMode)
'(net.jxta.util JxtaBiDiPipe JxtaServerPipe)
'(net.jxta.id IDFactory)
'(net.jxta.pipe PipeService)
'(java.io File)
'(java.net URI))

(def pipe (ref nil))

(defn sendHeartbeat [#^JxtaBiDiPipe pipe]
(let [strMsgElement (new StringMessageElement Jxta/HEARTBEAT_ELEMENT_NAME "heartbeat" nil)
msg (doto (new Message) (.addMessageElement Jxta/MESSAGE_NAMESPACE_NAME strMsgElement))]
(.sendMessage pipe msg)))

(defn sendResponse [pipe]
(if-not (nil? pipe)
(let [respElement (new StringMessageElement Jxta/RESPONSE_ELEMENT_NAME "client_response_message" nil)
response (doto (new Message) (.addMessageElement Jxta/MESSAGE_NAMESPACE_NAME respElement))]
(.sendMessage pipe response))))

(def clientPipeMsgListener (proxy [PipeMsgListener] []
(pipeMsgEvent [#^PipeMsgEvent event]
(let [msg (.getMessage event)
msgElement (.getMessageElement msg Jxta/MESSAGE_NAMESPACE_NAME Jxta/TEST_ELEMENT_NAME)
currentThreadName (.getName (Thread/currentThread))]
(println "client thread " currentThreadName " received message: " msg "\nmsg element: " msgElement)))))

(defn heartbeat []
(while (not (nil? @pipe))
(do
(sendHeartbeat @pipe)
(Thread/sleep 30000))))

(defn run [netPeerGroup]
(let [connectPipe (Jxta/getPipeAdvertisement)]
(println "Attempting to establish a connection to: " (.getPipeID connectPipe))
(dosync (ref-set pipe (new JxtaBiDiPipe netPeerGroup connectPipe 20000 clientPipeMsgListener true)))
(println "JxtaBiDi pipe created\n\n")
(heartbeat)))

(defn -main []
(let [home (new File (new File ".nn_cache") "client")
manager (new NetworkManager NetworkManager$ConfigMode/ADHOC Jxta/NETWORK_NAME (.toURI home))]
(.startNetwork manager)
(let [netPeerGroup (.getNetPeerGroup manager)]
(.waitForRendezvousConnection manager 0)
(run netPeerGroup))))

Loading

0 comments on commit f583f54

Please sign in to comment.