如何用JAVA实现Linux上的消息队列功能

如题所述

下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处。有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信——使用命名管道
一、什么是消息队列
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。
Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。
二、在Linux中使用消息队列
Linux提供了一系列消息队列的函数接口来让我们方便地使用它来实现进程间的通信。它的用法与其他两个System V PIC机制,即信号量和共享内存相似。
1、msgget函数
该函数用来创建和访问一个消息队列。它的原型为:
int msgget(key_t, key, int msgflg);

与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。
它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1.
2、msgsnd函数
该函数用来把消息添加到消息队列中。它的原型为:
int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

msgid是由msgget函数返回的消息队列标识符。
msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:
struct my_message{
long int message_type;
/* The data you wish to transfer*/
};

msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。
msgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。
如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.
3、msgrcv函数
该函数用来从一个消息队列获取消息,它的原型为
int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);

msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。
msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。
msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。
调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.
4、msgctl函数
该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:
int msgctl(int msgid, int command, struct msgid_ds *buf);

command是将要采取的动作,它可以取3个值,
IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。
IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值

IPC_RMID:删除消息队列
buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:
struct msgid_ds
{
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
};

成功时返回0,失败时返回-1.
三、使用消息队列进行进程间通信
马不停蹄,介绍完消息队列的定义和可使用的接口之后,我们来看看它是怎么让进程进行通信的。由于可以让不相关的进程进行行通信,所以我们在这里将会编写两个程序,msgreceive和msgsned来表示接收和发送信息。根据正常的情况,我们允许两个程序都可以创建消息,但只有接收者在接收完最后一个消息之后,它才把它删除。
接收信息的程序源文件为msgreceive.c的源代码为:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>

struct msg_st
{
long int msg_type;
char text[BUFSIZ];
};

int main()
{
int running = 1;
int msgid = -1;
struct msg_st data;
long int msgtype = 0; //注意1

//建立消息队列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
//从队列中获取消息,直到遇到end消息为止
while(running)
{
if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1)
{
fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
exit(EXIT_FAILURE);
}
printf("You wrote: %s\n",data.text);
//遇到end结束
if(strncmp(data.text, "end", 3) == 0)
running = 0;
}
//删除消息队列
if(msgctl(msgid, IPC_RMID, 0) == -1)
{
fprintf(stderr, "msgctl(IPC_RMID) failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}

发送信息的程序的源文件msgsend.c的源代码为:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>

#define MAX_TEXT 512
struct msg_st
{
long int msg_type;
char text[MAX_TEXT];
};

int main()
{
int running = 1;
struct msg_st data;
char buffer[BUFSIZ];
int msgid = -1;

//建立消息队列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}

//向消息队列中写消息,直到写入end
while(running)
{
//输入数据
printf("Enter some text: ");
fgets(buffer, BUFSIZ, stdin);
data.msg_type = 1; //注意2
strcpy(data.text, buffer);
//向队列发送数据
if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
//输入end结束输入
if(strncmp(buffer, "end", 3) == 0)
running = 0;
sleep(1);
}
exit(EXIT_SUCCESS);
}

转载仅供参考,版权属于原作者。祝你愉快,满意请采纳哦
温馨提示:内容为网友见解,仅供参考
无其他回答

如何用JAVA实现Linux上的消息队列功能
你好,进入Linux后,点击[应用程序],选择[系统设置],再选择[网络],打开网络设置页面,点击[新建],在列表中选择[xDSL],下一步选择连接xDSL的网卡(如果你有两块网卡的话),再设置名称,用户名、密码信息后,点击][应用]即设置完毕。点击[激活]即可连接到互联网。

到底什么是消息队列?Java中如何实现消息队列
比如你写日志,因为可能一个客户端有多个操作去写,又有很多个客户端,显然并发不能无穷大,于是你就需要把写日志的请求放入到消息队列里,在消费者那边依次把队列中产生的日志写到数据库里。至于怎么实现消息队列,其实你本身一个普通的队列就行呀~看你需要什么附加功能而已。

数据结构—队列(Queue)的原理以及Java实现案例
可以看出来,使用链式结构实现队列相比顺序结构的实现更加简单。3.2 队列的链式存储结构简单实现\/***队列的链式储存结构的简单单链表实现*\/publicclassMySingleLinkedQueue<E>{\/***空构造器,内部的节点均没有初始化,在第一次添加时才会初始化。*\/publicMySingleLinkedQueue(){}\/***元素个数*\/privateintsize;\/***...

到底什么是消息队列?Java中如何实现消息队列
消息队列是线程间通讯的手段:import java.util.*public class MsgQueue{private Vector queue = null;public MsgQueue(){queue = new Vector();}public synchronized void send(Object o){queue.addElement(o);}public synchronized Object recv(){if(queue.size()==0)return null;Object o = que...

到底什么是消息队列?Java中如何实现消息队列
通俗的说,就是一个容器,你把消息丢进去,不需要立即处理。然后有个程序去从你的容器里面把消息一条条读出来处理。消息队列,可以是activeMQ,kafka之类的,也可以是数据库的一张任务表。个人觉得消息队列,主要有两个作用:降低耦合消息可以暂时存在在消息队列中,等待消息接收者根据自身的负载处理能力...

Java中关于如何实现多线程消息队列的实例(java多线程通信)
消息队列是线程间通讯的手段:importjava.util.publicclassMsgQueue{ privateVectorqueue=null;publicMsgQueue(){ queue=newVector();} publicvoidsend(Objecto){ queue.addElement(o);} publicObjectrecv(){ if(queue.size()==0)returnnull;Objecto=queue.();queue.(0);\/\/orqueue[0]=nullcan...

消息队列原理及选型
Queue(队列) PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收。 Message(消息体) 根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输 点对点模型用于消息生产者和消息消费者之间点到点的通信。 点对点模式包含三个角色: 每个消息都被发送到一个特定的...

java消息队列应用在哪些场景
1. 异步处理 Java消息队列在异步处理中发挥着重要作用。当一个系统需要处理大量请求,但又不希望立即完成这些请求时,可以使用消息队列来实现异步处理。通过将请求放入消息队列,系统可以立即返回响应,而后台线程则负责逐步处理这些请求。这种方式可以显著提高系统的响应速度和吞吐量。2. 解耦系统组件 消息队列...

Java开发中消息队列和rpc框架都是做什么的?
一,消息队列服务一般用于设计多系统之间的信息传输,一般这种传输不需要对方对数据做出回应。它最常见的方式是构建异步的生产者-消费者模式。我们在系统开发中,有些业务并不需要及时返回结果,我们可以把这些操作放到队列中,然后另起一个消费者去处理它。比如日志,数据库异步更新。二,rpc一般是用于服务...

消息队列概念
队列(Queue) ,是先进先出(FIFO, First-In-First-Out)的线性表,通俗的讲队列就是一群人或者事物按照排好的顺序等待接受服务或者处理 本地队列按照功能可划分为初始化队列,传输队列,目标队列和死信队列。初始化队列用作消息触发功能。传输队列只是暂存待传的消息,条件许可的情况下,通过管道将消息...

相似回答