博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
进程间通信方式--消息队列
阅读量:3955 次
发布时间:2019-05-24

本文共 5224 字,大约阅读时间需要 17 分钟。

消息队列


定义


消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

Linux用宏MSGMAXMSGMNB来限制一条消息的最大长度和一个队列的最大长度。

消息队列就是一个消息的链表。可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向中按照一定的规则添加新消息;对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核持续的,记录消息队列的数据结构位于内核中,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除。

通过上面的分析我们知道通过struct kern_ipc_perm的指针可以找到相应的条目,在消息队列中,我们的每一个条目为一个消息队列msg_queue,定义在/include/linux/msg.h

struct msg_queue {
struct kern_ipc_perm q_perm; time_t q_stime; /* last msgsnd time */ time_t q_rtime; /* last msgrcv time */ time_t q_ctime; /* last change time */ unsigned long q_cbytes; /* current number of bytes on queue */ unsigned long q_qnum; /* number of messages in queue */ unsigned long q_qbytes; /* max number of bytes on queue */ pid_t q_lspid; /* pid of last msgsnd */ pid_t q_lrpid; /* last receive pid */ struct list_head q_messages; struct list_head q_receivers; struct list_head q_senders;};

消息队列的使用


对消息队列的操作有:

#define SEMOP		 1//改变信号量的值#define SEMGET		 2//打开或者创建一个信号量#define SEMCTL		 3//消息量控制#define SEMTIMEDOP	 4

打开或者创建消息队列


#include 
#include
#include
int msgget(key_t key, int msgflg);

参数1

System V IPC中方通过key来唯一标识一个IPC对象,在消息队列中,一个key唯一标识一个队列。

参数2

msgflg低端的九个位为权限标志。

创建一个新的消息队列,需要设置IPC_CREAT标志,即:msgflg |=IPC_CREAT
如果创建的ID已经存在,此时函数不会出现错误,而只是忽略创建标志。
如果msgflg 是和IPC_CREATIPC_EXCL一起使用,,可以确保创建的是一个新的IPC对象,那么如果创建的ID已经存在,此时将会返回错误。

成功将会返回一个队列标识符(msqid),失败返回-1

发送消息到消息队列


int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

msgsnd的发送的数据保存完整性,要么全部发送成功,要么失败,不会发送部分数据。

参数1

msqidmsget的返回值;

参数2

msgq:消息的具体内容,指向一个struct msgbuf类型的结构体(可自定义):

struct msgbuf {
long mtype; /* type of message */ char *mtext;};

参数3

msgsz:发送消息的内容的长度

参数4

msgflg是用来控制当前消息队列无法容纳发送过来的数据或者消息的个数达到系统的限制数目时,操作的阻塞或者直接返回。

如果被设置了IPC_NOWAIT,函数将立即返回,不会发送消息,并且返回值为-1;
如果清除了该标志,函数将会挂起等待队列腾出可用空间,直到可以容纳完整消息或者消息队列被删除,或被信号中断;

从消息队列读取消息


ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);

参数1

msqidmsget的返回值;

参数2

msgq:消息的具体内容,指向一个struct msgbuf类型的结构体(可自定义):

struct msgbuf {
long mtype; /* type of message */ char *mtext;};

参数3

msgsz:读取消息的内容的长度

参数4

msgty是一个long类型的整数,用来标识要接受消息类别。

msgty 描述
=0 获取队列第一个可用消息
>0 接受第一个相同类型的第一个消息
<0 获取类型等于或小于msgtyp绝对值的第一个消息

msgflg是用来控制当前消息队列无法容纳发送过来的数据或者消息的个数达到系统的限制数目时,操作的阻塞或者直接返回。

如果被设置了IPC_NOWAIT,函数将立即返回,不会发送消息,并且返回值为-1;
如果清除了该标志,函数将会挂起等待队列腾出可用空间,直到可以容纳完整消息或者消息队列被删除,或被信号中断;

参数5

msgflg用来控制当前队列没有相应类型的消息可以接受时,采取的操作。

如果被设置为IPC_NOWAIT,函数将会立即返回,返回值为-1。如果该标志被清除,进程将会挂起等待,直到相应的消息到达,或者消息队列被删除,或被信号中断;

消息队列控制函数

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

参数1

msqidmsget的返回值;

参数2

根据cmd的不同,该函数功能不一样

cmd 描述
IPC_STAT 检索当期当前消息队列的属性,返回的值储存在一个struct msqid_ds结构体中
IPC_SET 如果进程有足够权限,可以利用buf来设置队列属性
IPC_RMID 用于删除队列

struct msqid_ds是一个定义在/include/linux/msg.h中的结构体

struct msqid_ds {
struct ipc_perm {
__kernel_key_t key; __kernel_uid_t uid; __kernel_gid_t gid; __kernel_uid_t cuid; __kernel_gid_t cgid; __kernel_mode_t mode; unsigned short seq; }; struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */};

消息队列的实现


实现让不相关的进程进行行通信,将会编写两个程序,msgreceive.cmsgsned.c来表示接收和发送信息。两个程序都可以创建消息,但只有接收者在接收完最后一个消息之后,才把它删除。

msgreceive.c

#include 
#include
#include
#include
#include
#include
#define MAX_SIZE 1024struct msg_st{ long int mtype; char buf[MAX_SIZE];};int main(void){ int msgid; struct msg_st data; int running = 1; long int msgtype = 0; //注意1 //创建消息队列 if((msgid = msgget((key_t)1234, 0666|IPC_CREAT)) == -1){ printf("msgget failed!\n"); return -1; } //接收消息 while(running){ if(msgrcv(msgid, (void *)&data, MAX_SIZE, msgtype, 0) == -1){ printf("msgrcv failed receive!\n"); exit(1); } printf("You wrote: %s\n",data.buf); //遇到end结束 if(strncmp(data.buf,"end",3) == 0){ running =0; } } return 0;}

msgsnd.c

#include 
#include
#include
#include
#include
#include
#define MAX_SIZE 1024struct msg_st{ long int mtype; char buf[MAX_SIZE];};int main(void){ int msgid; struct msg_st data; char buffer[MAX_SIZE]; int running = 1; //建立消息队列 msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if(msgid == -1) { printf( "msgget failed!\n"); exit(1); } //向消息队列中写消息,直到写入end while(running) { //输入数据 printf("Enter some text: "); fgets(buffer, MAX_SIZE, stdin); data.mtype = 1; //注意2 strcpy(data.buf, buffer); //向队列发送数据 if(msgsnd(msgid, (void*)&data, MAX_SIZE, 0) == -1) { printf( "msgsnd failed\n"); exit(1); } //输入end结束输入 if(strncmp(buffer, "end", 3) == 0) running = 0; sleep(1); } return 0;}

运行结果:

在这里插入图片描述

转载地址:http://unvzi.baihongyu.com/

你可能感兴趣的文章
Ubuntu 各版本代号简介
查看>>
mysql创建只读和运维用户
查看>>
配置本地yum源2
查看>>
SQLServer 2016报错Microsoft R Open 和 Microsoft R Server的解决方案
查看>>
shell获取最新文件
查看>>
SqlServer2016添加计划作业
查看>>
用Navicat备份SqlServer表,制作定时任务
查看>>
VM虚拟机桥接出现connect: network is unreachable
查看>>
windows计划任务执行,但是程序未执行
查看>>
ifcfg-ens33中ip和ifconfig出来的ip不一致
查看>>
nginx正常运行,访问报错502 bad gateway
查看>>
windows下批处理杀死进程
查看>>
SqlServer2016没有management和sa无法登陆
查看>>
VM虚拟机ping通同网段,无法上公网
查看>>
VMware虚拟机桥接网络配置设置
查看>>
bat拷贝文件
查看>>
Window设置生成dmp文件配置说明
查看>>
shell脚本实现删除指定日期之前的文件
查看>>
centos 7配置mysql yum源(自行选择哪个版本)
查看>>
批处理 %~dp0
查看>>