Skip to content

Commit

Permalink
C#
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitesh Bansal committed Apr 25, 2018
1 parent 9bc5a96 commit 4401da7
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 0 deletions.
30 changes: 30 additions & 0 deletions RckMQC#/BroadCastProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using org.apache.rocketmq.client.producer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Producer
{
class BroadCastProducer
{
static void Main(string[] args)
{
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
String[] tags = new String[] { "TagA", "TagB" };
for (int i = 0; i < 50; i++)
{
Message msg = new Message("BroadTopicTest",
tags[i % 2],
"OrderID188",
Encoding.Default.GetBytes("Hello world"+i));
SendResult sendResult = producer.send(msg);
Console.WriteLine(sendResult);
}
producer.shutdown();
}
}
}
32 changes: 32 additions & 0 deletions RckMQC#/BroadcastConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using org.apache.rocketmq.client.consumer;
using org.apache.rocketmq.client.consumer.listener;
using org.apache.rocketmq.common.consumer;
using org.apache.rocketmq.common.protocol.heartbeat;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Consumer
{
class BroadcastConsumer
{
static void Main(string[] args)
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadcastConsumerGroup");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);


consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.subscribe("BroadTopicTest", "TagA");

consumer.registerMessageListener(new TestListener());

consumer.start();
Console.WriteLine("Broadcast Consumer Started.");
}
}
}
25 changes: 25 additions & 0 deletions RckMQC#/Consumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using org.apache.rocketmq.client.consumer;
using org.apache.rocketmq.client.consumer.listener;
using org.apache.rocketmq.common.consumer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Consumer
{
class Consumer
{
static void Main(string[] args)
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("defaulttopic", "*");
consumer.registerMessageListener(new TestListener() );
consumer.start();
Console.WriteLine("Consumer Started.");
}
}
}
25 changes: 25 additions & 0 deletions RckMQC#/MessageQueueSelector1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using java.util;
using org.apache.rocketmq.client.producer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Producer
{
class MessageQueueSelector1: MessageQueueSelector
{



public MessageQueue select(List mqs, Message msg, Object arg)
{
int id = (int)arg;
int index = id % mqs.size();
return (MessageQueue)mqs.get(index);
}

}
}
30 changes: 30 additions & 0 deletions RckMQC#/OrderConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using java.util.concurrent.atomic;
using org.apache.rocketmq.client.consumer;
using org.apache.rocketmq.common.consumer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Consumer
{
class OrderConsumer
{
static void Main(string[] args)
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new TestListener1());

consumer.start();

Console.WriteLine("Consumer Started");
}
}
}
34 changes: 34 additions & 0 deletions RckMQC#/OrderProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using java.lang;
using org.apache.rocketmq.client.producer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Producer
{
class OrderProducer
{
static void Main(string[] args)
{
MQProducer producer = new DefaultMQProducer("example_group_name");
//Launch the instance.
producer.start();
string[] tags = new string[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
for (int i = 0; i < 100; i++)
{
int orderId = i % 10;

Message msg = new Message("TopicTestjjj", tags[i % tags.Length], "KEY" + i,
Encoding.Default.GetBytes("Hello RocketMQ " + i));
SendResult sendResult = producer.send(msg, new MessageQueueSelector1()
, orderId);

Console.WriteLine(sendResult);
}
producer.shutdown();
}
}
}
42 changes: 42 additions & 0 deletions RckMQC#/Producer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using org.apache.rocketmq.client.producer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Producer
{
class Producer
{
static void Main(string[] args)
{
DefaultMQProducer p = new DefaultMQProducer("ProducerGroup");
p.start();



for (int i = 0; i < 100; i++)
{
try
{
var data = Encoding.Default.GetBytes("Hello To All"+i);
Message m = new Message("defaulttopic", data);

SendResult sendResult = p.send(m);

Console.WriteLine(sendResult);
}
catch (Exception e)
{

Console.WriteLine(e.Message);
break;
}
}
p.shutdown();
Console.ReadKey();
}
}
}
24 changes: 24 additions & 0 deletions RckMQC#/ScheduledMessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using org.apache.rocketmq.client.consumer;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Consumer
{
class ScheduledMessageConsumer
{
static void Main(string[] args)
{

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");

consumer.subscribe("TestTopic", "*");

consumer.registerMessageListener(new TestListner());

consumer.start();
}
}
}
32 changes: 32 additions & 0 deletions RckMQC#/ScheduledMessageProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using org.apache.rocketmq.client.producer;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Producer
{
class ScheduledMessageProducer
{
static void Main(string[] args)
{
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");

producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++)
{
Message message = new Message("TestTopic", Encoding.Default.GetBytes("Hello scheduled message " + i));

message.setDelayTimeLevel(3);

producer.send(message);
}


producer.shutdown();
}
}
}
27 changes: 27 additions & 0 deletions RckMQC#/TestListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using java.util;
using org.apache.rocketmq.client.consumer.listener;
using org.apache.rocketmq.common.message;
using System.Collections.Generic;
using System.Text;
using System;

namespace Consumer
{
internal class TestListener : MessageListenerConcurrently
{


public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext ccc)
{
for(int i = 0 ; i < list.size(); i++ )
{
var msg = list.get(i) as Message;
byte[] body = msg.getBody();
var str = Encoding.Default.GetString(body);
Console.WriteLine(str);

}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
28 changes: 28 additions & 0 deletions RckMQC#/TestListner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using java.util;
using org.apache.rocketmq.client.consumer.listener;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Consumer
{
internal class TestListner : MessageListenerConcurrently
{

public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext ccc)
{
for (int i = 0; i < list.size(); i++)
{
var msg = list.get(i) as Message;

Console.WriteLine(" Receive message " + msg.getBuyerId() + msg.getDelayTimeLevel() + "ms later");

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
51 changes: 51 additions & 0 deletions RckMQC#/TestListner1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using java.util;
using java.util.concurrent.atomic;
using org.apache.rocketmq.client.consumer.listener;
using org.apache.rocketmq.common.message;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Consumer
{
internal class TestListener1 : MessageListenerOrderly
{

AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext context)
{

context.setAutoCommit(false);
for (int i = 0; i < list.size(); i++)
{
var msg = list.get(i) as Message;
byte[] body = msg.getBody();
var str = Encoding.Default.GetString(body);
Console.WriteLine(str);

}
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0)
{
return ConsumeOrderlyStatus.SUCCESS;
}
else if ((this.consumeTimes.get() % 3) == 0)
{
return ConsumeOrderlyStatus.ROLLBACK;
}
else if ((this.consumeTimes.get() % 4) == 0)
{
return ConsumeOrderlyStatus.COMMIT;
}
else if ((this.consumeTimes.get() % 5) == 0)
{
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;

}
}
}

0 comments on commit 4401da7

Please sign in to comment.