消息队列(MSG)
1. 基本概念
Linux 消息队列是 Linux 系统中的一种进程间通信机制。它允许多个进程向一个消息队列中写入消息,或者从消息队列中读取消息。
消息队列是一种先进先出(FIFO)的数据结构,可以存储任意类型的消息。消息队列有两个指针:一个指向队列头,一个指向队列尾。写入消息的进程会向队列尾写入消息,而读取消息的进程则会从队列头读取消息。
消息队列可以用于进程间的同步和通信。例如,可以通过消息队列来实现进程间的异步通信,也可以用来实现进程间的同步。消息队列通常用于实现多线程应用程序,能够有效地提高应用程序的性能和可维护性。
Linux 消息队列的基本操作包括创建、打开、关闭、发送和接收消息等,使用方法一般是:
- 发送者:
- 获取消息队列的ID。
- 将数据放入一个附带有标识的特殊的结构体,发送给消息队列。
- 接收者:
- 获取消息队列的ID。
- 将指定标识的消息读出。
当发送者和接收者都不再使用消息队列时,需要及时删除它以释放系统资源。
2. 消息队列API
2.1 获取消息队列ID
msgget()
用于获取一个消息队列的描述符。该函数的原型如下:
int msgget(key_t key, int msgflg);
该函数接受两个参数:
key
:消息队列的键值。- xxxxxxxxxx1 1int shmget(key_t key, size_t size, int shmflg);jsx showLineNumbers
如果函数执行成功,则返回一个消息队列的描述符。否则,返回
-1
。
msgget()
函数用于获取一个消息队列的描述符。它能够通过消息队列的键值来查找消息队列,并返回一个用于操作消息队列的描述符。例如,可以使用 msgget()
函数来获取一个消息队列的描述符,然后通过该描述符来向消息队列发送消息。
2.2 发送消息
msgsnd()
用于向消息队列中写入消息。该函数的原型如下:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
该函数接受四个参数:
msqid
:消息队列的描述符。msgp
:指向要写入消息队列的消息缓冲区的指针。msgsz
:要写入消息队列的消息的大小。msgflg
:控制写入消息队列的方式的标志。如果函数执行成功,则返回 0。否则,返回一个非 0 值。
msgsnd()
函数用于向消息 队列中写入消息。它能够将一个消息写入指定的消息队列,并在写入成功时返回。例如,可以使用 msgget()
函数获取消息队列的描述符,然后使用 msgsnd()
函数向该消息队列中写入消息。
- 选项msgflg是一个位屏蔽字,因此IPC_CREAT、IPC_EXCL和权限mode可以用位或的方式叠加起来,比如:
msgget(key, IPC_CREAT\0666);
表示如果key对应的消息队列不存在就创建,且权限指定为 0666,若已存在则直接获取 ID。 - 权限只有读和写,执行权限是无效的,例如0777跟0666是等价的。
- 当key被指定为IPC_PRVATE 时,系统会自动产生一个未用的 key来对应一个新的消息队列对象。一般用于线程间通信。
2.3 接收消息
msgrcv()
是 Linux 系统中的一个函数,用于从消息队列中读取消息。该函数的原型如下:
int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
该函数接受五个参数:
msqid
:消息队列的描述符。msgp
:指向要用于存储读取到的消息的缓冲区的指针。msgsz
:要读取的消息的大小。msgtyp
:要读取的消息的类型。msgflg
:控制读取消息队列的方式的标志。如果函数执行成功,则返回实际读取到的消息的大小。否则,返回一个负值。
msgrcv()
函数用于从消息队列中读取消息。它能够从指定的消息队列中读取一条消息,并在读取成功时返回。例如,可以使用 msgget()
函数获取消息队列的描述符,然后使用 msgrcv()
函数从该消息队列中读取消息。
使用这两个收、发消息函数需要注意以下几点:
- 发送消息时,消息必须被组织成以下形式:
struct msgbuf {
long mtype;//消息的标识
char mtext[1];//消息的正文
}
也就是说:发送出去的消息必须以一个long型数据打头,作为该消息的标识,后面的数据则没有要求。
- 消息的标识可以是任意长整型数值,但不能是OL。
- 参数
msgsz
是消息中正文的大小,不包含消息的标识。
2.4 设置或获取消息队列属性
msgctl()
是 Linux 系统中的一个函数,用于控制消息队列。该函数的原型如下:
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
该函数接受三个参数:
msqid
:消息队列的描述符。cmd
:指定要进行的操作的类型。
- IPC_STAT:获取该MSG的信息,储存在结构体 msqid_ds 中。
- IPC_SET:设置该MSG的信息,储存在结构体 msqid_ds。
- IPC_RMID: 立即删除该MSG,并且唤醒所有阻塞在该MSG上的进程,同时忽略第三个参数。
- IPC_INFO:获得关于当前系统中 MSG的限制值信息。
- MSG_INFO:获得关于当前系统中MSG的相关资源消耗信息。
- MSG_STAT:同IPC_STAT,但 msgid为该消息队列在内核中记录所有消息队列信息的数组的下标,因此通过迭代所有的下标可以获得系统中所有消息队列的相关信息
buf
:指向用于存储消息队列信息的结构体的指针。如果函数执行成功,则返回 0。否则,返回一个非 0 值。
msgctl()
函数用于控制消息队列。它能够对指定的消息队列执行各种操作,例如,查询消息队列信息、更改消息队列的标志、删除消息队列等。该函数的行为取决于 cmd
参数的值。例如,如果将 cmd
设为 IPC_STAT
,则 msgctl()
函数会将指定消息队列的信息存储到 buf
所指向的结构体中。
3. 相关结构体定义
3.1 属性信息结构体
IPC_STAT获得的属性信息被存放在以下结构体中:
struct msqid_ds {
struct ipc_perm msg_perm; /*权限相关信息*/
time_t msg_stime; /*最后一次发送消息的时间*/
time_t msg_rtime; /*最后一次接收消息的时间*/
time_t msg_ctime; /*最后一次状态变更的时间*/
unsigned long _msg_cbytes; /*当前消息队列中的数据尺寸*/
msgqnum_t msg_qnum; /*当前消息队列中的消息个数*/
msglen_t msg_qbytes; /*消息队列的最大数据尺寸*/
pid_t msg_lspid; /*最后一个发送消息的进程PID*/
pid_t msg_lrpid; /*最后一个接收消息的进程PID*/
};
3.2 权限信息结构体
struct ipc perm {
key _t _key; /* 当前消息队列的键值key */
uid_t uid; /* 当前消息队列所有者的有效UID*/
gid_t gid; /*当前消息队列所有者的有效GID */
uid_t cuid; /* 当前消息队列创建者的有效UID */
gid_t cgid; /*当前消息队列创建者的有效GID*/
unsigned short mode; /*消息队列的读写权限*/
unsigned shortseq; /*序列号*/
};
3.3 定义结构体
当使用IPC_INFO时,需要定义一个如下结构体来获取系统关于消息队列的限制值信息,并且将这个结构体指针强制类型转化为第三个参数的类型。
struct msginfo {
int msgpool; /*系统消息总尺寸(千字节为单位)最大值*/
int msgmap; /*系统消息个数最大值*/
int msgmax; /*系统单个消息尺寸最大值*/
int msgmnb; /*写入消息队列字节数最大值*/
int msgmni; /*系统消息队列个数最大值*/
int msgssz; /*消息段尺寸*/
int msgtql; /*系统中所有消息队列中的消息总数最大值*/
unsigned short int msgseg; /*分配给消息队列的数据段的最大值*/
};
3.3 注意点
当使用选项MSG_INFO时,跟 IPC_INFO一样也是获得一个msginfo结构体的信息,但是有如下几点不同:
- 成员 msgpool记录的是系统当前存在的MSG的个数总和。
- 成员msgmap记录的是系统当前所有MSG中的消息个数总和。
- 成员msgtql记录的是系统当前所有MSG中的所有消息的所有字节数总和。
4. 示例代码
下面的示例展示了一个进程 jack 如何使用消息队列给另一个进程 rose 发送消息的过程,以及如何使用msgctl( )函数,删除不再使用的消息队列:
- head.h
#ifndef _HEAD_H_
#define _HEAD_H_
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <errno.h>
#include <time.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "head.h"
#include <signal.h>
#include <sys/wait.h>
#define PATH "./"
#define PROJ_ID 1
#define MSGSIZE 100 //消息正文的大小
#define JTOR 1 //jack发给rose的消息
#define RTOJ 2 //rose发给jack的消息
struct msgbuf
{
long mtype; //消息类型
char mtext[MSGSIZE]; //消息正文
};
#endif
- rose.c
#include "head.h"
#include <singal.h>
void child(int sig)
{
if(sig == SIGCHLD) {
printf("通道已关闭\n");
exit(0);
}
}
int main(int argc, char *argv[])
{
//获取一个IPC对象的key
key_t key = ftok(PATH, PROJ_ID);
if(key == -1) {
perror("ftok() fail");
exit(0);
}
//获取一个消息队列的ID
int msgid = msgget(key, IPC_CREAT|0666);
if(msgid == -1) {
perror("msgget() fail");
exit(0);
}
struct msgbuf msg;
pid_t pid = fork();
//创建一个子进程,专门接收jack的消息
if(pid == -1) {
perror("fork()() fail");
exit(0);
}
//子进程
if(pid == 0) {
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
if(msgrcv(msgid, &msg, MSGSIZE, JTOR, 0) == -1) {
perror("msgrcv() fail");
exit(0);
}
if( !strncmp(msg.mtext, "bye", 3) )
{
//先把通道关闭(把消息队列删掉)
msgctl(msgid, IPC_RMID, NULL);
exit(0);
}
printf("from jack: %s\n", msg.mtext);
}
}
//父进程
if(pid > 0) {
signal(SIGCHLD, child);
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
msg.mtype = RTOJ;
printf("请输入要发送给jack的消息\n");
fgets(msg.mtext, MSGSIZE, stdin); //从键盘获取最多不超过100个字节的信息
msgsnd(msgid, &msg, MSGSIZE, 0);
if( !strncmp(msg.mtext, "bye", 3) )
break;
}
}
//等待子进程
wait(NULL);
return 0;
}
- jack.c
#include "head.h"
#include <singal.h>
void child(int sig)
{
if(sig == SIGCHLD) {
printf("通道已关闭\n");
exit(0);
}
}
int main(int argc, char *argv[])
{
//获取一个IPC对象的key
key_t key = ftok(PATH, PROJ_ID);
if(key == -1) {
perror("ftok() fail");
exit(0);
}
//获取一个消息队列的ID
int msgid = msgget(key, IPC_CREAT|0666);
if(msgid == -1) {
perror("msgget() fail");
exit(0);
}
struct msgbuf msg;
pid_t pid = fork();
//创建一个子进程,专门接收rose的消息
if(pid == -1) {
perror("fork()() fail");
exit(0);
}
//子进程
if(pid == 0) {
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
if(msgrcv(msgid, &msg, MSGSIZE, RTOJ, 0) == -1) {
perror("msgrcv() fail");
exit(0);
}
if( !strncmp(msg.mtext, "bye", 3) )
{
//先把通道关闭(把消息队列删掉)
msgctl(msgid, IPC_RMID, NULL);
exit(0);
}
printf("from rose: %s\n", msg.mtext);
}
}
//父进程
if(pid > 0) {
signal(SIGCHLD, child);
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
msg.mtype = JTOR;
printf("请输入要发送给rose的消息\n");
fgets(msg.mtext, MSGSIZE, stdin); //从键盘获取最多不超过100个字节的信息
msgsnd(msgid, &msg, MSGSIZE, 0);
if( !strncmp(msg.mtext, "bye", 3) )
break;
}
}
//等待子进程
wait(NULL);
return 0;
}