Unix IPC 之消息隊列
1. 概述
Unix/Linux 是多任務的操作系統,它通過多個進程分別處理不同事務來實現,如果多個進程要進行協同工作或者爭用同一個資源,互相之間的通訊就很有必要了。
進程間通信(Inter-Process Communication
,又稱IPC
)是指一組由操作系統支持的機制,用於進程間的交互(如協調或通信)。在 UNIX/Linux 下主要有以下幾種方式:
(1)管道(有名管道(pipe
)和無名管道(fifo
))(2)信號(signal
)(3)信號量(semaphore
)(4)消息隊列(message
queues
)(5)共享內存(shared
memory
)(6)套接字(socket
)
2. 什麼是消息隊列
消息隊列提供了一種將數據塊從一個進程發送到另一個進程的方法。每個數據塊都被認爲有一個類型,它可以實現消息的隨機查詢,消息不必按照先入先出的順序讀取,用戶可以根據消息的類型讀取它們。
和有名(pipe
)/ 無名(fifo
)管道一樣,當從消息隊列中讀取一條消息的時候,消息隊列中相應的數據會被刪除。此外,它允許一個或多個進程向其寫入或讀取消息。每個消息隊列都有一個消息隊列標識,在整個系統中是唯一的;而且消息隊列只有在內核重啓或是手動刪除(ipcrm
)消息隊列時纔會被刪除。如果不手動刪除消息隊列,則消息隊列將會一直存在於系統內核中。
消息隊列和管道有着同樣的缺點,即每個數據塊的最大長度是上限,系統上所有隊列的最大長度也是上限。每條消息的最大長度爲最大值(MSGMAX
),每個消息隊列的總字節數爲最大值(MSGMNB
),系統上的消息隊列總數也是最大值(MSGMNI
)。
宏MSGMNI
、MSGMAX
和MSGMNB
聲明於/inclue/uapi/linux/msg.h
頭文件中。其默認值如下所示:
#define MSGMNI 32000 /* <= IPCMNI */ /* max # of msg queue identifiers */
#define MSGMAX 8192 /* <= INT_MAX */ /* max size of message (bytes) */
#define MSGMNB 16384 /* <= INT_MAX */ /* default max size of a message queue */
在/proc/sys/kernel/
目錄下,有相應的msgmni
、msgmax
和msgmnb
這3
個文件,裏面記錄了當前內核中與消息隊列相關的幾個參數值。sysctl
或系統函數sysctl()
進行修改。也可直接在/etc/sysctl.conf
文件中進行內核配置。kernel
)提供的一個鏈表,它實現了一個基於鏈表的數據結構。消息隊列是基於消息的,而管道是基於字節流的,在某個進程往一個隊列寫入消息之前,並不需要另外某個進程在該消息隊列上等待消息的到達。
對於有名管道(pipe
)和無名管道(fifo
),最後一次關閉發生時,仍在該管道(pipe
或fifo
)上的數據將被丟棄。有名管道(pipe
)或無名管道(fifo
)都是隨進程持續的,而消息隊列、信號量、共享內存都是隨內核持續的。
3. 消息隊列實現
在第一次創建消息隊列時,內核會爲其分配關聯的隊列控制塊(Queue
Control
Block
, QCB
)、消息隊列名稱、唯一 ID、內核緩衝區,隊列長度、最大消息長度以及一個或多個等待列表。內核還採用開發人員提供的參數(例如隊列長度和最大消息長度)來確定消息隊列需要多少內存。內核在獲得信息後,會從系統內存或某些私有內存空間爲消息隊列分配空間。
消息隊列本身由許多元素組成,每個元素可以保存一條消息。其中保存第一條和最後一條消息的元素稱爲:頭和尾。消息隊列的內部原理大致如下圖示所示:
消息隊列具有兩個相關聯的任務的等待列表。接收任務等待列表由在隊列爲空時等待隊列的任務組成。發送隊列由隊列滿時等待隊列的任務完成。
系統中記錄消息隊列的數據結構是struct
ipc_ids
msg_ids
,它位於內核中,系統中所有的消息隊列都可以在結構msg_ids
中找到入口。
從 Linux 內核5.4.3
版本的msg.c
源文件可以得知,系統上每個現有的消息隊列都有一個struct
msg_queue
數據結構。該數據類型聲明如下:
struct msg_queue {
struct kern_ipc_perm q_perm; /* 消息隊列操作訪問權限 */
time64_t q_stime; /* 上一次調用msgsnd發送消息的時間 */
time64_t q_rtime; /* 上一次調用msgrcv接收消息的時間 */
time64_t q_ctime; /* 上一次修改的時間 */
unsigned long q_cbytes; /* 隊列上當前字節數據 */
unsigned long q_qnum; /* 隊列中的消息數目 */
unsigned long q_qbytes; /* 隊列上最大字節數目 */
struct pid *q_lspid; /* 上一次調用msgsnd的pid */
struct pid *q_lrpid; /* 上一次接收消息的pid */
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
//這3個標註的內核鏈表用於管理睡眠的發送至(q_senders)、睡眠的接受者(q_receivers)和消息本身(q_messages)
} __randomize_layout;
該結構包含消息隊列的狀態信息以及隊列的訪問權限。成員q_stime
、q_rtime
、q_ctime
、q_cbytes
、q_qnum
、q_qbytes
、q_lspid
、q_lrpid
在上面的數據類型聲明中採用註釋的形式給出說明。
成員q_perm
用以說明該消息隊列操作訪問權限。對於struct
ipc_perm
數據類型,其聲明如下:
struct ipc_perm
{
__kernel_key_t key; //消息隊列全局唯一ID
__kernel_uid_t uid; //所有者有效UID
__kernel_gid_t gid; //所有者有效GID
__kernel_uid_t cuid; //創建者有效UID
__kernel_gid_t cgid; //創建者有效GID
__kernel_mode_t mode; //權限
unsigned short seq; //序列號
};
其中最值得關注的是成員key
,其類型爲key_t
。這是系統對 IPC 資源的唯一標識符,這個標識符必須在申請 IPC 資源之前獲得,我們可以通過系統函ftok()
獲得。
而成員q_messages
、q_receivers
和q_senders
分別對應於消息本身、睡眠的接收者和睡眠的發送者。而q_messages
中的各個消息都封裝在一個struct
msg_msg
的數據類型中,該結構聲明如下:
// 雙向鏈表指針
struct list_head {
struct list_head *next;
struct list_head *prev;
};
struct msg_msg {
struct list_head m_list;
long m_type; //消息類型
int m_ts; //消息正文長度
struct msg_msgseg *next;
/* 接下來是實際的消息*/
}
成員m_list
用於連接各個消息的鏈表元素,其他成員用於管理消息本身。其中m_type
指明消息類型,m_ts
用於指定消息正文長度(字節)。從該結構聲明可以看到,struct
msg_msg
數據結構中沒有聲明成員用於存儲消息本身。這是因爲每個消息都至少分配了一個內存頁,msg_msg
實例則保存在該頁的起始處,剩餘的空間可用於存儲消息正文。這正是成員next
的作用,如果消息超過一個內存的長消息,則需要用next
(參考《深入 Linux 內核架構》)。q_receivers
)的數據結構聲明分別如下:
/* one msg_receiver structure for each sleeping receiver */
struct msg_receiver {
struct list_head r_list;
struct task_struct *r_tsk;
int r_mode;
long r_msgtype;
long r_maxsize;
struct msg_msg *r_msg;
};
該結構聲明保存了執行消息隊列進程的task_struct
的指針,以及對預期消息的描述(包括消息類型,消息最大長度等)、指向msg_msg
實例的一個指針。
而睡眠的消息生產者(q_senders
)的數據類型聲明如下:
/* one msg_sender for each sleeping sender */
struct msg_sender {
struct list_head list;
struct task_struct *tsk;
size_t msgsz;
};
list
是鏈表,tsk
是指向對應進程的task_struct
指針。
根據前面的描述與介紹可以得出,Linux 內核與消息獨立建立起來的聯繫如下圖所示:
4. 系統函數
與消息隊列相關聯的系統調用函數共有5
個,它們分別是:ftok()
、msgget()
、msgsnd()
、msgrcv()
和msgctl()
。下面將分別對這幾個系統函數作詳細的介紹。
4.1 ftok()
系統函數ftok()
的函數原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
該函數將一個現有的路徑名(pathname
)和一個整數標識符 (帶有proj_id
的低8
位) 轉換爲key_t
值,稱爲IPC
key
,也就是所謂的IPC
唯一祕鑰。
注意:它由兩個參數(pathname
、proj_id
)生成key
。基本上,pathname
必須是這個進程可以讀取的文件。另一個參數,proj_id
通常被設置爲任意字符,比如'K'
。ftok()
函數使用關於已命名文件的信息(比如inode
編號等)和proj_id
來爲msgget()
生成一個可能唯一的鍵(key
)。要使用相同隊列的程序必須生成相同的密鑰,因此它們必須將相同的參數傳遞給ftok()
。
4.2 msgget()
系統函數msgget()
的原型如下所示:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
該函數用於創建一個新的消息隊列,或者是獲取一個現有的消息隊列。如果該函數調用成功,返回消息隊列的msgid
作爲進程的唯一標識符;失敗則返回-1
。第一個參數key
是一個IPC
key
,可以由ftok()
函數生成,標識唯一的消息隊列。
第二個參數flag
是創建方式(IPC_CREAT
和IPC_EXCL
)。
單獨使用IPC_CREAT
表示如果請求的資源已經存在,則直接獲得創建 IPC 資源的應用程序;如果不是,則創建新的 IPC 資源。
IPC_CREAT
與IPC_EXCL
一起使用,以指示用於創建 IPC 資源的應用程序。如果請求的資源不存在,則創建一個新的 IPC 資源;如果它已經存在,則返回-1
。
單獨使用IPC_EXCL
沒有任何意義。它的存在是爲了將其與IPC_CREAT
一起使用,以確保新創建的資源是可用的。
4.3 msgsnd()
系統函數msgsnd()
用於向消息隊列添加數據,其函數原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
該函數成功返回 0,失敗返回 - 1。對於該函數的幾個參數,詳細描述如下:① msqid
msgget() 函數返回的消息隊列標識符。
② msgp
用於指向要發送的消息的指針。
③ msgsz
msg
指向的消息長度,消息緩衝區結構中mtext
的大小,不包括數據類型。
注意:msgsnd()
和msgrcv()
函數中的第二個參數(分別是msgp
和msg_ptr
)的數據類型均爲struct msgbuf
,其數據類型聲明如下(/include/uapi/linux/msg.h
文件):
/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
__kernel_long_t mtype; /* type of message */
char mtext[1]; /* message text */
};
④ msgflg
指定如何發送消息的標誌。比如:msgflg
=
IPC_NOWAIT
, 則表示如果消息不能立即發送,不阻塞進程,返回-1
,並設置錯誤碼errno
(用戶區)爲EAGAIN
。如果不設置標誌,則使用值0
。
4.4 msgrcv()
系統函數msgrcv()
用於從消息隊列中讀取消息,該函數原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);
對於該函數中的5
個參數,詳細解釋說明如下:① msqid
該參數表示函數msgget()
返回的消息隊列標識符。
② msgp
指向要接收的消息的指針。
③ msgsz
msgp
指向的消息的長度,消息緩衝區結構中mtext
的大小,不包括數據類型。正如上面4.3
節所描述,msgp
的數據類型是struct
msgbuf
。
④ msgtyp
我們希望閱讀的消息類型。可能是以下情況之一:
i. msgtyp
=
0
, 將返回隊列上的第一條消息。ii. msgtyp
>
0
(正整數), 隊列上類型(mtype
,即struct
msgbuf
中成員mtype
)等於此整數的第一條消息(除非msgflg
中設置了特定標誌,請參見下文⑤)。iii. msgtyp
<
0
(負整數), 隊列上類型小於或等於此整數絕對值的第一條消息。
如果msgtyp
爲零,則檢索第一條消息,而不管其類型是什麼。該值可由接收進程用於指定消息選擇。總的來說,msgflg
設置爲0
。
⑤ msgflg
該參數控制函數的行爲,以下任何標誌的邏輯 “或” 組合。
msgflg
=
IPC_NOWAIT
,隊列沒有可讀消息不等待,返回ENOMSG
錯誤。msgflg
=
MSG_EXCEPT
,如果消息類型參數爲正整數,則返回類型不等於給定整數的第一條消息。msgflg
=
MSG_NOERROR
,當消息大小超過msgze
時被截斷。
“
這個參數依然是控制函數行爲的標誌,取值可以是:0,表示忽略;IPC_NOWAIT,如果消息隊列爲空,則返回一個 ENOMSG,並將控制權交回調用函數的進程。如果不指定這個參數,那麼進程將被阻塞直到函數可以從隊列中得到符合條件的消息爲止。如果一個 client 正在等待消息的時候隊列被刪除,EIDRM 就會被返回。如果進程在阻塞等待過程中收到了系統的中斷信號,EINTR 就會被返回。MSG_NOERROR,如果函數取得的消息長度大於 msgsz,將只返回 msgsz 長度的信息,剩下的部分被丟棄了。如果不指定這個參數,E2BIG 將被返回,而消息則留在隊列中不被取出。當消息從隊列內取出後,相應的消息就從隊列中刪除了。
”
在 Linux 內核的5.4.3
版本中,msgflg
支持以下選項參數:
//// 聲明於/include/uapi/linux/msg.h
/* msgrcv options */
#define MSG_NOERROR 010000 /* no error if message is too big */
#define MSG_EXCEPT 020000 /* recv any msg except of specified type.*/
#define MSG_COPY 040000 /* copy (not remove) all queue messages */
特別注意:當msgtype
>
0
且 msgflg
=
MSG_EXCEPT
時,接收類型不等於msgtype
的第一條消息。
該函數調用成功時,返回實際放入接收緩衝區msgp
中的字符數。失敗返回-1
。
4.5 msgctl()
系統函數msgctl()
用於消息隊列控制功能,它不僅僅有刪除消息隊列的作用。其函數原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
該函數的各參數詳細如下描述:
① msqid
msgget()
函數返回的消息隊列標識符。
② cmd
指定要在消息隊列上執行的命令,其具體如下:
所以如果我們執行刪除操作,我們可以將cmd
設置爲IPC_RMID
。對於IPC_INFO
命令(Linux 系統特有),其對應的msginof
數據類型聲明如下(/include/uapi/linux/msg.h
文件):
/* buffer for msgctl calls IPC_INFO, MSG_INFO */
struct msginfo {
int msgpool;
int msgmap;
int msgmax;
int msgmnb;
int msgmni;
int msgssz;
int msgtql;
unsigned short msgseg;
};
③ buf
如果選項刪除隊列,則第3
個參數buf
傳NULL
。參數buf
的數據類型是 struct
msqid_ds
,其數據類型聲明如下所示:
/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first; /* first message on queue,unused */
struct msg *msg_last; /* last message in queue,unused */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
unsigned long msg_lqbytes; /* ditto */
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};
5. 與消息隊列相關的 shell 命令
與消息隊列相關的 shell 命令有兩個,分別是:ipcs
和 ipcrm
。它們分別用來查看消息隊列以及刪除消息隊列;除此之外,這兩個命令還可以用於共享內存、信號量。這些都將在後面的相關文章中詳細進行講解,本文只着重說明與消息隊列相關的選項。
5.1 查看消息隊列
使用ipcs -q
可以查看當前消息隊列中的消息情況。
如下圖所示,一開始內核中沒有消息隊列,然後我們啓動write
(實現寫消息隊列的demo
)應用程序,之後向該消息隊列中寫入HELLO
字段串,之後再次使用ipcs -q
便可查看到消息隊列向的key
和msqid
以及消息數量等信息。
5.2 刪除消息隊列
現在使用ipcrm
將該消息隊列中的所有消息刪掉,如下所示,可以選擇根據key
或msqid
去刪除,這裏選項的是根據msqid
進行刪除。
6. 消息隊列實戰
· 實例一該實例中,生產者不斷地向消息隊列中寫入用戶終端輸入的消息,直到輸入 “over
” 字符串後,結束該過程。
生產者代碼:
/// 向消息隊列寫入消息:sender.c文件; gcc sender.c -o sender
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>
struct msgbuf{
int mtype;
char mtext[128];
};
int main(int argc,char *argv[])
{
int msgid;
char buf[128] = {0};
key_t key = 11111;
struct msgbuf sndbuf;
if((msgid = msgget(key, 0666 | IPC_CREAT)) < 0)
{
perror("msgget");
exit(-1);
}
while(1)
{
printf("Enter some text: ");
memset(&sndbuf, 0x00, sizeof(sndbuf));
sndbuf.mtype = 1;
memset(buf, 0x00, sizeof(buf));
fgets(buf, sizeof(buf), stdin);
strncpy(sndbuf.mtext, buf, strlen(buf));
if(msgsnd(msgid,(void *)&sndbuf, sizeof(sndbuf), 0) < 0 )
{
perror("msgsnd");
exit(-1);
}
}
return 0;
}
消費者代碼:
/// 從消息隊列中讀取消息;receiver.c文件;gcc receiver.c -o receiver
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>
#include <ctype.h>
struct msgbuf{
int mtype;
char mtext[128];
};
int main(int argc,char *argv[])
{
int msgid;
char buf[128] = {0};
key_t key = 11111;
struct msgbuf rcvbuf;
if((msgid = msgget(key, 0666 | IPC_CREAT)) < 0)
{
perror("msgget");
exit(-1);
}
while(1)
{
memset(&rcvbuf, 0x00, sizeof(rcvbuf));
if(msgrcv(msgid, (void *)&rcvbuf, sizeof(rcvbuf), 0, 0) < 0)
{
perror("msgrcv");
exit(-1);
}
// 移除掉fgets函數讀取到的換行符。
if(isspace(rcvbuf.mtext[strlen(rcvbuf.mtext) - 1]))
{
rcvbuf.mtext[strlen(rcvbuf.mtext) - 1] = '\0';
}
printf("rcv msg[%s]\n", rcvbuf.mtext);
if(!strncmp(rcvbuf.mtext, "over", 4))
{
puts("over cycle.");
break;
}
}
msgctl(msgid, IPC_RMID, NULL);
return 0;
}
分別打開兩個 shell 終端,一個紙箱sender
進程,另外一個紙箱receiver
進程。在sender
進程中,用戶不斷從終端輸入消息,而receiver
終端則會將所從消息隊列中獲取到的消息打印到出來,直到讀取到字符 “over
” 則從內核中刪除該消息隊列。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/gmtEjV8blK8NLzBcSHgiew