消息队列(MSG)
1. 基本概念
Linux 消息队列是 Linux 系统中的一种进程间通信机制。它允许多个进程向一个消息队列中写入消息,或者从消息队列中读取消息。
消息队列是一种先进先出(FIFO)的数据结构,可以存储任意类型的消息。消息队列有两个指针:一个指向队列头,一个指向队列尾。写入消息的进程会向队列尾写入消息,而读取消息的进程则会从队列头读取消息。
消息队列可以用于进程间的同步和通信。例如,可以通过消息队列来实现进程间的异步通信,也可以用来实现进程间的同步。消息队列通常用于实现多线程应用程序,能够有效地提高应用程序的性能和可维护性。
Linux 消息队列的基本操作包括创建、打开、关闭、发送和接收消息等,使用方法一般是:
- 发送者:
- 获取消息队列的ID。
- 将数据放入一个附带有标识的特殊的结构体,发送给消息队列。
 
- 接收者:
- 获取消息队列的ID。
- 将指定标识的消息读出。
 
当发送者和接收者都不再使用消息队列时,需要及时删除它以释放系统资源。
2. 消息队列API
2.1 获取消息队列ID
msgget() 用于获取一个消息队列的描述符。该函数的原型如下:
int msgget(key_t key, int msgflg);
该函数接受两个参数:
key:消息队列的键值。- xxxxxxxxxx1 1int shmget(key_t key, size_t size, int shmflg);jsx showLineNumbers
如果函数执行成功,则返回一个消息队列的描述符。否则,返回
-1。
msgget() 函数用于获取一个消息队列的描述符。它能够通过消息队列的键值来查找消息队列,并返回一个用于操作消息队列的描述符。例如,可以使用 msgget() 函数来获取一个消息队列的描述符,然后通过该描述符来向消息队列发送消息。
2.2 发送消息
msgsnd()用于向消息队列中写入消息。该函数的原型如下:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
该函数接受四个参数:
msqid:消息队列的描述符。
msgp:指向要写入消息队列的消息缓冲区的指针。
msgsz:要写入消息队列的消息的大小。
msgflg:控制写入消息队列的方式的标志。如果函数执行成功,则返回 0。否则,返回一个非 0 值。
msgsnd() 函数用于向消息 队列中写入消息。它能够将一个消息写入指定的消息队列,并在写入成功时返回。例如,可以使用 msgget() 函数获取消息队列的描述符,然后使用 msgsnd() 函数向该消息队列中写入消息。
- 选项msgflg是一个位屏蔽字,因此IPC_CREAT、IPC_EXCL和权限mode可以用位或的方式叠加起来,比如: msgget(key, IPC_CREAT\0666);表示如果key对应的消息队列不存在就创建,且权限指定为 0666,若已存在则直接获取 ID。
- 权限只有读和写,执行权限是无效的,例如0777跟0666是等价的。
- 当key被指定为IPC_PRVATE 时,系统会自动产生一个未用的 key来对应一个新的消息队列对象。一般用于线程间通信。
2.3 接收消息
msgrcv() 是 Linux 系统中的一个函数,用于从消息队列中读取消息。该函数的原型如下:
int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
该函数接受五个参数:
msqid:消息队列的描述符。
msgp:指向要用于存储读取到的消息的缓冲区的指针。
msgsz:要读取的消息的大小。
msgtyp:要读取的消息的类型。
msgflg:控制读取消息队列的方式的标志。如果函数执行成功,则返回实际读取到的消息的大小。否则,返回一个负值。
msgrcv() 函数用于从消息队列中读取消息。它能够从指定的消息队列中读取一条消息,并在读取成功时返回。例如,可以使用 msgget() 函数获取消息队列的描述符,然后使用 msgrcv() 函数从该消息队列中读取消息。
使用这两个收、发消息函数需要注意以下几点:
- 发送消息时,消息必须被组织成以下形式:
struct msgbuf {
	long mtype;//消息的标识
	char mtext[1];//消息的正文
}
也就是说:发送出去的消息必须以一个long型数据打头,作为该消息的标识,后面的数据则没有要求。
- 消息的标识可以是任意长整型数值,但不能是OL。
- 参数 msgsz是消息中正文的大小,不包含消息的标识。
2.4 设置或获取消息队列属性
msgctl() 是 Linux 系统中的一个函数,用于控制消息队列。该函数的原型如下:
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
该函数接受三个参数:
msqid:消息队列的描述符。
cmd:指定要进行的操作的类型。
- IPC_STAT:获取该MSG的信息,储存在结构体 msqid_ds 中。
- IPC_SET:设置该MSG的信息,储存在结构体 msqid_ds。
- IPC_RMID: 立即删除该MSG,并且唤醒所有阻塞在该MSG上的进程,同时忽略第三个参数。
- IPC_INFO:获得关于当前系统中 MSG的限制值信息。
- MSG_INFO:获得关于当前系统中MSG的相关资源消耗信息。
- MSG_STAT:同IPC_STAT,但 msgid为该消息队列在内核中记录所有消息队列信息的数组的下标,因此通过迭代所有的下标可以获得系统中所有消息队列的相关信息
buf:指向用于存储消息队列信息的结构体的指针。如果函数执行成功,则返回 0。否则,返回一个非 0 值。
msgctl() 函数用于控制消息队列。它能够对指定的消息队列执行各种操作,例如,查询消息队列信息、更改消息队列的标志、删除消息队列等。该函数的行为取决于 cmd 参数的值。例如,如果将 cmd 设为 IPC_STAT,则 msgctl() 函数会将指定消息队列的信息存储到 buf 所指向的结构体中。
3. 相关结构体定义
3.1 属性信息结构体
IPC_STAT获得的属性信息被存放在以下结构体中:
struct msqid_ds {
    struct ipc_perm msg_perm;	/*权限相关信息*/
    time_t msg_stime;			/*最后一次发送消息的时间*/
    time_t msg_rtime;			/*最后一次接收消息的时间*/
    time_t msg_ctime;			/*最后一次状态变更的时间*/
    unsigned long _msg_cbytes; 	/*当前消息队列中的数据尺寸*/
    msgqnum_t msg_qnum;			/*当前消息队列中的消息个数*/
    msglen_t msg_qbytes;		/*消息队列的最大数据尺寸*/
    pid_t msg_lspid;			/*最后一个发送消息的进程PID*/
    pid_t msg_lrpid;			/*最后一个接收消息的进程PID*/
};
3.2 权限信息结构体
struct ipc perm {
	key _t _key;			/* 当前消息队列的键值key */
	uid_t uid;				/* 当前消息队列所有者的有效UID*/
	gid_t gid;				/*当前消息队列所有者的有效GID */
	uid_t cuid;				/* 当前消息队列创建者的有效UID */
	gid_t cgid;				/*当前消息队列创建者的有效GID*/
	unsigned short mode; 	/*消息队列的读写权限*/
	unsigned shortseq; 		/*序列号*/
};
3.3 定义结构体
当使用IPC_INFO时,需要定义一个如下结构体来获取系统关于消息队列的限制值信息,并且将这个结构体指针强制类型转化为第三个参数的类型。
struct msginfo {
	int msgpool;	/*系统消息总尺寸(千字节为单位)最大值*/
    int msgmap;		/*系统消息个数最大值*/
	int msgmax;		/*系统单个消息尺寸最大值*/
    int msgmnb;		/*写入消息队列字节数最大值*/
    int msgmni;		/*系统消息队列个数最大值*/
    int msgssz;		/*消息段尺寸*/
	int msgtql;		/*系统中所有消息队列中的消息总数最大值*/
	unsigned short int msgseg; /*分配给消息队列的数据段的最大值*/
};
3.3 注意点
当使用选项MSG_INFO时,跟 IPC_INFO一样也是获得一个msginfo结构体的信息,但是有如下几点不同:
- 成员 msgpool记录的是系统当前存在的MSG的个数总和。
- 成员msgmap记录的是系统当前所有MSG中的消息个数总和。
- 成员msgtql记录的是系统当前所有MSG中的所有消息的所有字节数总和。
4. 示例代码
下面的示例展示了一个进程 jack 如何使用消息队列给另一个进程 rose 发送消息的过程,以及如何使用msgctl( )函数,删除不再使用的消息队列:
- head.h
#ifndef _HEAD_H_
#define _HEAD_H_
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <errno.h>
#include <time.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "head.h"
#include <signal.h>
#include <sys/wait.h>
#define PATH	"./"
#define PROJ_ID	1
#define MSGSIZE		100	//消息正文的大小
#define JTOR		1	//jack发给rose的消息
#define RTOJ		2	//rose发给jack的消息
struct msgbuf
{
	long mtype;				//消息类型
	char mtext[MSGSIZE];	//消息正文
};
#endif
- rose.c
#include "head.h"
#include <singal.h>
void child(int sig)
{
	if(sig == SIGCHLD) {
		printf("通道已关闭\n");
		exit(0);
	}
}
int main(int argc, char *argv[])
{
	//获取一个IPC对象的key
	key_t key = ftok(PATH, PROJ_ID);
	if(key == -1) {
		perror("ftok() fail");
		exit(0);
	}
	
	//获取一个消息队列的ID
	int msgid = msgget(key, IPC_CREAT|0666);
	if(msgid == -1) {
		perror("msgget() fail");
		exit(0);
	}
	
	struct msgbuf msg;
	pid_t pid = fork();
	
	//创建一个子进程,专门接收jack的消息
	if(pid == -1) {
		perror("fork()() fail");
		exit(0);
	}
	
	//子进程
	if(pid == 0) {
		while(1) {
			bzero(&msg, sizeof(msg));	//先清空消息结构体变量
			if(msgrcv(msgid, &msg, MSGSIZE, JTOR, 0) == -1) {
				perror("msgrcv() fail");
				exit(0);
			}
			if( !strncmp(msg.mtext, "bye", 3) )
			{
				//先把通道关闭(把消息队列删掉)
				msgctl(msgid, IPC_RMID, NULL);
				exit(0);
			}
			printf("from jack: %s\n", msg.mtext);
		}
	}
	
	//父进程
	if(pid > 0) {
		signal(SIGCHLD, child);
		while(1) {
			bzero(&msg, sizeof(msg));	//先清空消息结构体变量
			msg.mtype = RTOJ;
			printf("请输入要发送给jack的消息\n");
			fgets(msg.mtext, MSGSIZE, stdin);	//从键盘获取最多不超过100个字节的信息
			msgsnd(msgid, &msg, MSGSIZE, 0);
			if( !strncmp(msg.mtext, "bye", 3) )
				break;
		}
	}
	
	//等待子进程
	wait(NULL);
	return 0;
}
- jack.c
#include "head.h"
#include <singal.h>
    
void child(int sig)
{
	if(sig == SIGCHLD) {
		printf("通道已关闭\n");
		exit(0);
	}
}
int main(int argc, char *argv[])
{
	//获取一个IPC对象的key
	key_t key = ftok(PATH, PROJ_ID);
	if(key == -1) {
		perror("ftok() fail");
		exit(0);
	}
	
	//获取一个消息队列的ID
	int msgid = msgget(key, IPC_CREAT|0666);
	if(msgid == -1) {
		perror("msgget() fail");
		exit(0);
	}
	
	struct msgbuf msg;
	pid_t pid = fork();
	
	//创建一个子进程,专门接收rose的消息
	if(pid == -1) {
		perror("fork()() fail");
		exit(0);
	}
	
	//子进程
	if(pid == 0) {
		while(1) {
			bzero(&msg, sizeof(msg));	//先清空消息结构体变量
			if(msgrcv(msgid, &msg, MSGSIZE, RTOJ, 0) == -1) {
				perror("msgrcv() fail");
				exit(0);
			}
			if( !strncmp(msg.mtext, "bye", 3) )
			{
				//先把通道关闭(把消息队列删掉)
				msgctl(msgid, IPC_RMID, NULL);
				exit(0);
			}
			printf("from rose: %s\n", msg.mtext);
		}
	}
	
	//父进程
	if(pid > 0) {
		signal(SIGCHLD, child);
		while(1) {
			bzero(&msg, sizeof(msg));	//先清空消息结构体变量
			msg.mtype = JTOR;
			printf("请输入要发送给rose的消息\n");
			fgets(msg.mtext, MSGSIZE, stdin);	//从键盘获取最多不超过100个字节的信息
			msgsnd(msgid, &msg, MSGSIZE, 0);
			if( !strncmp(msg.mtext, "bye", 3) )
				break;
		}
	}
	
	//等待子进程
	wait(NULL);
	
	return 0;
}