Skip to content

Commit 066631e

Browse files
committed
Add new example
1 parent 43eba27 commit 066631e

File tree

7 files changed

+249
-9
lines changed

7 files changed

+249
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#region Usings
2+
3+
using System;
4+
using System.Text;
5+
using System.Threading;
6+
using Extend;
7+
8+
#endregion
9+
10+
namespace Stomp.Net.Example.SelectorsCore
11+
{
12+
public class Program
13+
{
14+
#region Constants
15+
16+
private const String Host = "host";
17+
18+
private const String Password = "password";
19+
20+
private const Int32 Port = 63617;
21+
//private const Int32 Port = 61613;
22+
23+
private const String QueueName = "TestQ";
24+
private const String SelectorKey = "selectorProp";
25+
private const String User = "admin";
26+
27+
private const Int32 NoOfMessages = 30;
28+
29+
private static readonly ManualResetEventSlim _resetEvent = new ManualResetEventSlim( );
30+
31+
#endregion
32+
33+
public static void Main( String[] args )
34+
{
35+
// Configure a logger to capture the output of the library
36+
Tracer.Trace = new ConsoleLogger();
37+
38+
try
39+
{
40+
using ( var subscriber = new Subscriber() )
41+
{
42+
SendMessages();
43+
44+
Console.WriteLine($" [{Thread.CurrentThread.ManagedThreadId}] Start receiving messages.");
45+
46+
subscriber.Start();
47+
48+
Console.WriteLine( _resetEvent.Wait( 1.ToMinutes() ) ? "All messages received" : "Timeout :(" );
49+
}
50+
}
51+
catch ( Exception ex )
52+
{
53+
Console.WriteLine( $"Error: {ex}" );
54+
}
55+
56+
Console.WriteLine( "Press <enter> to exit." );
57+
Console.ReadLine();
58+
}
59+
60+
61+
private static void SendMessages()
62+
{
63+
var factory = GetConnectionFactory();
64+
65+
// Create connection for both requests and responses
66+
using ( var connection = factory.CreateConnection() )
67+
{
68+
// Open the connection
69+
connection.Start();
70+
71+
// Create session for both requests and responses
72+
using ( var session = connection.CreateSession( AcknowledgementMode.IndividualAcknowledge ) )
73+
{
74+
// Create a message producer
75+
IDestination destinationQueue = session.GetQueue( QueueName );
76+
using ( var producer = session.CreateProducer( destinationQueue ) )
77+
{
78+
producer.DeliveryMode = MessageDeliveryMode.Persistent;
79+
80+
for ( var i = 0; i < NoOfMessages; i++ )
81+
{
82+
// Send a message to the destination
83+
var message = session.CreateBytesMessage( Encoding.UTF8.GetBytes( $"Hello World {i}" ) );
84+
message.StompTimeToLive = TimeSpan.FromMinutes( 1 );
85+
message.Headers["test"] = $"test {i}";
86+
producer.Send( message );
87+
88+
Console.WriteLine($"Message sent {i}");
89+
}
90+
}
91+
}
92+
}
93+
}
94+
95+
private static ConnectionFactory GetConnectionFactory()
96+
{
97+
// Create a connection factory
98+
var brokerUri = "tcp://" + Host + ":" + Port;
99+
100+
return new ConnectionFactory( brokerUri,
101+
new StompConnectionSettings
102+
{
103+
UserName = User,
104+
Password = Password,
105+
TransportSettings =
106+
{
107+
SslSettings =
108+
{
109+
ServerName = "",
110+
ClientCertSubject = "",
111+
KeyStoreName = "My",
112+
KeyStoreLocation = "LocalMachine"
113+
}
114+
},
115+
SkipDesinationNameFormatting = false, // Determines whether the destination name formatting should be skipped or not.
116+
SetHostHeader = true, // Determines whether the host header will be added to messages or not
117+
HostHeaderOverride = null // Can be used to override the content of the host header
118+
} );
119+
120+
}
121+
122+
private class Subscriber : IDisposable
123+
{
124+
private IConnection _connection;
125+
private ISession _session;
126+
private IMessageConsumer _consumer;
127+
128+
private Int32 _noOfreceivedMessages;
129+
private readonly Object _sync = new Object();
130+
131+
public void Start()
132+
{
133+
var factory = GetConnectionFactory();
134+
135+
// Create connection for both requests and responses
136+
_connection = factory.CreateConnection();
137+
138+
// Open the connection
139+
_connection.Start();
140+
141+
// Create session for both requests and responses
142+
_session = _connection.CreateSession( AcknowledgementMode.IndividualAcknowledge );
143+
144+
// Create a message consumer
145+
IDestination sourceQueue = _session.GetQueue( QueueName );
146+
_consumer = _session.CreateConsumer( sourceQueue );
147+
148+
_consumer.Listener += message =>
149+
{
150+
var content = Encoding.UTF8.GetString( message.Content );
151+
Console.WriteLine( $" [{Thread.CurrentThread.ManagedThreadId}] {content}" );
152+
153+
Thread.Sleep( 500 );
154+
155+
message.Acknowledge();
156+
157+
lock ( _sync )
158+
{
159+
_noOfreceivedMessages++;
160+
161+
if ( _noOfreceivedMessages >= NoOfMessages )
162+
_resetEvent.Set();
163+
}
164+
};
165+
}
166+
167+
#region IDisposable
168+
169+
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
170+
public void Dispose()
171+
{
172+
_connection?.Dispose();
173+
_session?.Dispose();
174+
_consumer?.Dispose();
175+
}
176+
177+
#endregion
178+
}
179+
}
180+
181+
/// <summary>
182+
/// Console logger for Stomp.Net
183+
/// </summary>
184+
public class ConsoleLogger : ITrace
185+
{
186+
#region Implementation of ITrace
187+
188+
/// <summary>
189+
/// Writes a message on the error level.
190+
/// </summary>
191+
/// <param name="message">The message.</param>
192+
public void Error( String message )
193+
=> Console.WriteLine( $"[Error]\t\t{message}" );
194+
195+
/// <summary>
196+
/// Writes a message on the fatal level.
197+
/// </summary>
198+
/// <param name="message">The message.</param>
199+
public void Fatal( String message )
200+
=> Console.WriteLine( $"[Fatal]\t\t{message}" );
201+
202+
/// <summary>
203+
/// Writes a message on the info level.
204+
/// </summary>
205+
/// <param name="message">The message.</param>
206+
public void Info( String message )
207+
=> Console.WriteLine( $"[Info]\t\t{message}" );
208+
209+
/// <summary>
210+
/// Writes a message on the warn level.
211+
/// </summary>
212+
/// <param name="message">The message.</param>
213+
public void Warn( String message )
214+
=> Console.WriteLine( $"[Warn]\t\t{message}" );
215+
216+
#endregion
217+
}
218+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp2.0</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<ProjectReference Include="..\..\Stomp.Net\Stomp.Net.csproj" />
10+
</ItemGroup>
11+
12+
</Project>

.Src/Stomp.Net.sln

+12-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Stomp.Net", "Stomp.Net\Stom
77
EndProject
88
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{7767F6AF-0AB9-4131-BD5B-37368DAB40C3}"
99
EndProject
10-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Stomp.Net.Example.SendReceiveCore", "Examples\Stomp.Net.Example.SendReceiveCore\Stomp.Net.Example.SendReceiveCore.csproj", "{DA9362AA-2A1A-48C0-B969-1AABBE18AEED}"
10+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Stomp.Net.Example.SendReceiveCore", "Examples\Stomp.Net.Example.SendReceiveCore\Stomp.Net.Example.SendReceiveCore.csproj", "{DA9362AA-2A1A-48C0-B969-1AABBE18AEED}"
1111
EndProject
12-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Stomp.Net.Example.SelectorsCore", "Examples\Stomp.Net.Example.SelectorsCore\Stomp.Net.Example.SelectorsCore.csproj", "{89254E05-3BC6-4E33-BB15-4B3FE35E47CE}"
12+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Stomp.Net.Example.SelectorsCore", "Examples\Stomp.Net.Example.SelectorsCore\Stomp.Net.Example.SelectorsCore.csproj", "{89254E05-3BC6-4E33-BB15-4B3FE35E47CE}"
13+
EndProject
14+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Stomp.Net.Example.SimpleEventReceive", "Examples\Stomp.Net.Example.SimpleEventReceive\Stomp.Net.Example.SimpleEventReceive.csproj", "{40546821-3104-4EDC-90F3-95AE70975AB7}"
1315
EndProject
1416
Global
1517
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -29,12 +31,20 @@ Global
2931
{89254E05-3BC6-4E33-BB15-4B3FE35E47CE}.Debug|Any CPU.Build.0 = Debug|Any CPU
3032
{89254E05-3BC6-4E33-BB15-4B3FE35E47CE}.Release|Any CPU.ActiveCfg = Release|Any CPU
3133
{89254E05-3BC6-4E33-BB15-4B3FE35E47CE}.Release|Any CPU.Build.0 = Release|Any CPU
34+
{40546821-3104-4EDC-90F3-95AE70975AB7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
35+
{40546821-3104-4EDC-90F3-95AE70975AB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
36+
{40546821-3104-4EDC-90F3-95AE70975AB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
37+
{40546821-3104-4EDC-90F3-95AE70975AB7}.Release|Any CPU.Build.0 = Release|Any CPU
3238
EndGlobalSection
3339
GlobalSection(SolutionProperties) = preSolution
3440
HideSolutionNode = FALSE
3541
EndGlobalSection
3642
GlobalSection(NestedProjects) = preSolution
3743
{DA9362AA-2A1A-48C0-B969-1AABBE18AEED} = {7767F6AF-0AB9-4131-BD5B-37368DAB40C3}
3844
{89254E05-3BC6-4E33-BB15-4B3FE35E47CE} = {7767F6AF-0AB9-4131-BD5B-37368DAB40C3}
45+
{40546821-3104-4EDC-90F3-95AE70975AB7} = {7767F6AF-0AB9-4131-BD5B-37368DAB40C3}
46+
EndGlobalSection
47+
GlobalSection(ExtensibilityGlobals) = postSolution
48+
SolutionGuid = {461AC20D-B4A7-45CA-91E6-49BD89C159C8}
3949
EndGlobalSection
4050
EndGlobal

.Src/Stomp.Net/NMS.Stomp/Connection.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ internal void OnSessionException( Session sender, Exception exception )
327327

328328
internal void RemoveDispatcher( ConsumerId id )
329329
{
330-
if ( !_dispatchers.TryRemove( id, out IDispatcher _ ) )
330+
if ( !_dispatchers.TryRemove( id, out _ ) )
331331
Tracer.Warn( $"Failed to remove dispatcher with id '{id}'." );
332332
}
333333

@@ -336,7 +336,7 @@ internal void RemoveSession( Session session )
336336
if ( _closing.Value )
337337
return;
338338

339-
if ( !_sessions.TryRemove( session, out Session _ ) )
339+
if ( !_sessions.TryRemove( session, out _ ) )
340340
Tracer.Warn( $"Failed to remove session with session id: '{session.SessionId}'." );
341341
}
342342

.Src/Stomp.Net/NMS.Stomp/Session.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void DisposeOf( ConsumerId consumerId )
8181
if ( _closing )
8282
return;
8383

84-
if ( !_consumers.TryRemove( consumerId, out MessageConsumer _ ) )
84+
if ( !_consumers.TryRemove( consumerId, out _ ) )
8585
Tracer.Warn( $"Failed to remove message consumer with consumer id: '{consumerId}'." );
8686
}
8787

@@ -90,7 +90,7 @@ public void DisposeOf( ProducerId producerId )
9090
if ( _closing )
9191
return;
9292

93-
if ( !_producers.TryRemove( producerId, out MessageProducer _ ) )
93+
if ( !_producers.TryRemove( producerId, out _ ) )
9494
Tracer.Warn( $"Failed to remove message producer with producer id: '{producerId}'." );
9595
}
9696

@@ -254,7 +254,7 @@ private void RemoveConsumer( MessageConsumer consumer )
254254
if ( _closing )
255255
return;
256256

257-
if ( !_consumers.TryRemove( consumer.ConsumerId, out MessageConsumer _ ) )
257+
if ( !_consumers.TryRemove( consumer.ConsumerId, out _ ) )
258258
Tracer.Warn( $"Failed to remove consumer with consumer id: '{consumer.ConsumerId}'." );
259259
}
260260

.Src/Stomp.Net/NMS.Stomp/Transport/ResponseCorrelator.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected override void OnCommand( ITransport sender, ICommand command )
107107

108108
if ( _requestMap.TryGetValue( correlationId, out var future ) )
109109
{
110-
if ( !_requestMap.TryRemove( correlationId, out FutureResponse _ ) )
110+
if ( !_requestMap.TryRemove( correlationId, out _ ) )
111111
Tracer.Warn( $"Failed to remove future response with id: '{correlationId}'." );
112112

113113
future.Response = response;

.Src/Stomp.Net/Transport/TcpTransport.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class TcpTransport : Disposable, ITransport
3737
/// <summary>
3838
/// Timeout for closing the connection.
3939
/// </summary>
40-
private TimeSpan _maxThreadWait = TimeSpan.FromMilliseconds( 30000 );
40+
private readonly TimeSpan _maxThreadWait = TimeSpan.FromMilliseconds( 30000 );
4141

4242
/// <summary>
4343
/// Reading thread (background).

0 commit comments

Comments
 (0)