RocketMQ簡(jiǎn)單實(shí)例搭建
安裝RocketMQ需要jdk1.6以上, maven,git環(huán)境,以上環(huán)境自行百度命令安裝。
git clone https://github.com/alibaba/RocketMQ.git ##從github上下載RocketMQ開源項(xiàng)目
cd RocketMQ ##進(jìn)入文件夾
sh install.sh ##開始安裝
安裝完之后可以看到下圖這樣:其中可以看到一個(gè)符號(hào)鏈接devenv如紅框所示
然后
cd devenv/bin ##進(jìn)入鏈接的目錄下的bin目錄
nohup sh mqnamesrv -n "121.42.179.195:9876" & ##配置nameserver,121.42.179.195是本機(jī)ip,也就是服務(wù)器外網(wǎng)地址
nohup sh mqbroker -n "121.42.179.195:9876" & ##配置broker,121.42.179.195同上
之后
cat nohup.out
在輸出的最低端,可以看到紅框中的兩句話則說明nameserver和broker啟動(dòng)成功。
如果服務(wù)器內(nèi)存不夠,你就會(huì)啟動(dòng)失敗,可以修改runbroker.sh腳本(mqbroker文件中通過runbroker.sh腳本調(diào)用Broker的主函數(shù)com.alibaba.rocketmq.broker.BrokerStartup啟動(dòng)Broker)的JAVA_OPT參數(shù)
vi runbroker.sh
我阿里云內(nèi)存小,我就改成這樣
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"
2、編寫Consumer和Producer測(cè)試類
首先需要的jar包如下:
4.0.0
groupId
RocketMQ
1.0-SNAPSHOT
org.apache.maven.plugins
maven-compiler-plugin
1.6
1.6
RocketMQTest
http://maven.apache.org
UTF-8
com.alibaba.rocketmq
rocketmq-client
3.0.10
com.alibaba.rocketmq
rocketmq-all
3.0.10
pom
ch.qos.logback
logback-classic
1.1.1
ch.qos.logback
logback-core
1.1.1
junit
junit
4.10
test
Consumer類:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"rmq-group");
consumer.setNamesrvAddr("121.42.179.195:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("TopicA-test", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
Producer類:
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("121.42.179.195:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000); //每秒發(fā)送一次MQ
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart" + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
之后分別啟動(dòng),Consumer效果如下
這里需要注意,
默認(rèn)情況下,一臺(tái)服務(wù)器只能啟動(dòng)一個(gè)Producer或Consumer實(shí)例,所以如果需要在一臺(tái)服務(wù)器啟動(dòng)多個(gè)實(shí)例,需要設(shè)置實(shí)例的名稱,如要再建一個(gè)producer:
producer.setNamesrvAddr(“121.42.179.195:9876”);
producer.setInstanceName(“Producer2”);