Unix IPC 之消息隊列

1. 概述

Unix/Linux 是多任務的操作系統,它通過多個進程分別處理不同事務來實現,如果多個進程要進行協同工作或者爭用同一個資源,互相之間的通訊就很有必要了。

進程間通信(Inter-Process Communication,又稱IPC)是指一組由操作系統支持的機制,用於進程間的交互(如協調或通信)。在 UNIX/Linux 下主要有以下幾種方式:

(1)管道(有名管道(pipe)和無名管道(fifo))(2)信號(signal)(3)信號量(semaphore)(4)消息隊列(message queues)(5)共享內存(shared memory)(6)套接字(socket

2. 什麼是消息隊列

消息隊列提供了一種將數據塊從一個進程發送到另一個進程的方法。每個數據塊都被認爲有一個類型,它可以實現消息的隨機查詢,消息不必按照先入先出的順序讀取,用戶可以根據消息的類型讀取它們。

和有名(pipe)/ 無名(fifo)管道一樣,當從消息隊列中讀取一條消息的時候,消息隊列中相應的數據會被刪除。此外,它允許一個或多個進程向其寫入或讀取消息。每個消息隊列都有一個消息隊列標識,在整個系統中是唯一的;而且消息隊列只有在內核重啓或是手動刪除(ipcrm)消息隊列時纔會被刪除。如果不手動刪除消息隊列,則消息隊列將會一直存在於系統內核中。

消息隊列和管道有着同樣的缺點,即每個數據塊的最大長度是上限,系統上所有隊列的最大長度也是上限。每條消息的最大長度爲最大值(MSGMAX),每個消息隊列的總字節數爲最大值(MSGMNB),系統上的消息隊列總數也是最大值(MSGMNI)。

MSGMNIMSGMAXMSGMNB聲明於/inclue/uapi/linux/msg.h頭文件中。其默認值如下所示:

#define MSGMNI 32000   /* <= IPCMNI */     /* max # of msg queue identifiers */
#define MSGMAX  8192   /* <= INT_MAX */   /* max size of message (bytes) */
#define MSGMNB 16384   /* <= INT_MAX */   /* default max size of a message queue */

/proc/sys/kernel/目錄下,有相應的msgmnimsgmaxmsgmnb3個文件,裏面記錄了當前內核中與消息隊列相關的幾個參數值。當然,這些默認值可通過命令sysctl或系統函數sysctl()進行修改。也可直接在/etc/sysctl.conf文件中進行內核配置。消息隊列的本質其實是內核(kernel)提供的一個鏈表,它實現了一個基於鏈表的數據結構。消息隊列是基於消息的,而管道是基於字節流的,在某個進程往一個隊列寫入消息之前,並不需要另外某個進程在該消息隊列上等待消息的到達。

對於有名管道(pipe)和無名管道(fifo),最後一次關閉發生時,仍在該管道(pipefifo)上的數據將被丟棄。有名管道(pipe)或無名管道(fifo)都是隨進程持續的,而消息隊列、信號量、共享內存都是隨內核持續的。

3. 消息隊列實現

在第一次創建消息隊列時,內核會爲其分配關聯的隊列控制塊(Queue Control BlockQCB)、消息隊列名稱、唯一 ID、內核緩衝區,隊列長度、最大消息長度以及一個或多個等待列表。內核還採用開發人員提供的參數(例如隊列長度和最大消息長度)來確定消息隊列需要多少內存。內核在獲得信息後,會從系統內存或某些私有內存空間爲消息隊列分配空間。

消息隊列本身由許多元素組成,每個元素可以保存一條消息。其中保存第一條和最後一條消息的元素稱爲:頭和尾。消息隊列的內部原理大致如下圖示所示:

消息隊列具有兩個相關聯的任務的等待列表。接收任務等待列表由在隊列爲空時等待隊列的任務組成。發送隊列由隊列滿時等待隊列的任務完成。

系統中記錄消息隊列的數據結構是struct ipc_ids msg_ids,它位於內核中,系統中所有的消息隊列都可以在結構msg_ids中找到入口。

從 Linux 內核5.4.3版本的msg.c源文件可以得知,系統上每個現有的消息隊列都有一個struct msg_queue數據結構。該數據類型聲明如下:

struct msg_queue {
 struct kern_ipc_perm q_perm; /* 消息隊列操作訪問權限 */
 time64_t q_stime;    /* 上一次調用msgsnd發送消息的時間 */
 time64_t q_rtime;    /* 上一次調用msgrcv接收消息的時間 */
 time64_t q_ctime;    /* 上一次修改的時間 */
 unsigned long q_cbytes;   /* 隊列上當前字節數據 */
 unsigned long q_qnum;   /* 隊列中的消息數目 */
 unsigned long q_qbytes;   /* 隊列上最大字節數目 */
 struct pid *q_lspid;   /* 上一次調用msgsnd的pid */
 struct pid *q_lrpid;   /* 上一次接收消息的pid */

 struct list_head q_messages;
 struct list_head q_receivers;
 struct list_head q_senders;
 //這3個標註的內核鏈表用於管理睡眠的發送至(q_senders)、睡眠的接受者(q_receivers)和消息本身(q_messages)
} __randomize_layout;

該結構包含消息隊列的狀態信息以及隊列的訪問權限。成員q_stimeq_rtimeq_ctimeq_cbytesq_qnumq_qbytesq_lspidq_lrpid在上面的數據類型聲明中採用註釋的形式給出說明。

成員q_perm用以說明該消息隊列操作訪問權限。對於struct ipc_perm數據類型,其聲明如下:

struct ipc_perm
{
 __kernel_key_t key;  //消息隊列全局唯一ID
 __kernel_uid_t uid; //所有者有效UID
 __kernel_gid_t gid; //所有者有效GID
 __kernel_uid_t cuid; //創建者有效UID
 __kernel_gid_t cgid; //創建者有效GID
 __kernel_mode_t mode;  //權限
 unsigned short seq; //序列號
};

其中最值得關注的是成員key,其類型爲key_t。這是系統對 IPC 資源的唯一標識符,這個標識符必須在申請 IPC 資源之前獲得,我們可以通過系統函ftok()獲得。

而成員q_messagesq_receiversq_senders分別對應於消息本身、睡眠的接收者和睡眠的發送者。而q_messages中的各個消息都封裝在一個struct msg_msg的數據類型中,該結構聲明如下:

//  雙向鏈表指針
struct list_head {
 struct list_head *next;
 struct list_head *prev;
};

struct msg_msg {
 struct list_head m_list;
 long  m_type; //消息類型
 int  m_ts; //消息正文長度
 struct msg_msgseg *next;
 /* 接下來是實際的消息*/
}

成員m_list用於連接各個消息的鏈表元素,其他成員用於管理消息本身。其中m_type指明消息類型,m_ts用於指定消息正文長度(字節)。從該結構聲明可以看到,struct msg_msg數據結構中沒有聲明成員用於存儲消息本身。這是因爲每個消息都至少分配了一個內存頁,msg_msg實例則保存在該頁的起始處,剩餘的空間可用於存儲消息正文。這正是成員next的作用,如果消息超過一個內存的長消息,則需要用next(參考《深入 Linux 內核架構》)。此外,睡眠消息接收者(q_receivers)的數據結構聲明分別如下:

/* one msg_receiver structure for each sleeping receiver */
struct msg_receiver {
 struct list_head r_list;
 struct task_struct *r_tsk;

 int    r_mode;
 long   r_msgtype;
 long   r_maxsize;

 struct msg_msg  *r_msg;
};

該結構聲明保存了執行消息隊列進程task_struct的指針,以及對預期消息的描述(包括消息類型,消息最大長度等)、指向msg_msg實例的一個指針。

而睡眠的消息生產者(q_senders)的數據類型聲明如下:

/* one msg_sender for each sleeping sender */
struct msg_sender {
 struct list_head  list;
 struct task_struct  *tsk;
 size_t                  msgsz;
};

list是鏈表,tsk是指向對應進程的task_struct指針。

根據前面的描述與介紹可以得出,Linux 內核與消息獨立建立起來的聯繫如下圖所示:

4. 系統函數

與消息隊列相關聯的系統調用函數共有5個,它們分別是:ftok()msgget()msgsnd()msgrcv()msgctl()。下面將分別對這幾個系統函數作詳細的介紹。

4.1 ftok()

系統函數ftok()的函數原型如下:

#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id);

該函數將一個現有的路徑名(pathname)和一個整數標識符 (帶有proj_id的低8位) 轉換爲key_t值,稱爲IPC key,也就是所謂的IPC 唯一祕鑰。

注意:它由兩個參數(pathnameproj_id)生成key。基本上,pathname必須是這個進程可以讀取的文件。另一個參數,proj_id通常被設置爲任意字符,比如'K'ftok()函數使用關於已命名文件的信息(比如inode編號等)和proj_id來爲msgget()生成一個可能唯一的鍵(key)。要使用相同隊列的程序必須生成相同的密鑰,因此它們必須將相同的參數傳遞給ftok()

4.2 msgget()

系統函數msgget()的原型如下所示:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

該函數用於創建一個新的消息隊列,或者是獲取一個現有的消息隊列。如果該函數調用成功,返回消息隊列的msgid作爲進程的唯一標識符;失敗則返回-1。第一個參數key是一個IPC key,可以由ftok()函數生成,標識唯一的消息隊列。

第二個參數flag是創建方式(IPC_CREATIPC_EXCL)。

單獨使用IPC_CREAT表示如果請求的資源已經存在,則直接獲得創建 IPC 資源的應用程序;如果不是,則創建新的 IPC 資源。

IPC_CREATIPC_EXCL一起使用,以指示用於創建 IPC 資源的應用程序。如果請求的資源不存在,則創建一個新的 IPC 資源;如果它已經存在,則返回-1

單獨使用IPC_EXCL沒有任何意義。它的存在是爲了將其與IPC_CREAT一起使用,以確保新創建的資源是可用的。

4.3 msgsnd()

系統函數msgsnd()用於向消息隊列添加數據,其函數原型如下:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

該函數成功返回 0,失敗返回 - 1。對於該函數的幾個參數,詳細描述如下:① msqid

msgget() 函數返回的消息隊列標識符。

msgp

用於指向要發送的消息的指針。

msgsz

msg指向的消息長度,消息緩衝區結構中mtext的大小,不包括數據類型。

注意:msgsnd()msgrcv()函數中的第二個參數(分別是msgpmsg_ptr)的數據類型均爲struct msgbuf,其數據類型聲明如下(/include/uapi/linux/msg.h文件):

/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
 __kernel_long_t mtype;          /* type of message */
 char mtext[1];                  /* message text */
};

msgflg

指定如何發送消息的標誌。比如:msgflg = IPC_NOWAIT, 則表示如果消息不能立即發送,不阻塞進程,返回-1,並設置錯誤碼errno(用戶區)爲EAGAIN。如果不設置標誌,則使用值0

4.4 msgrcv()

系統函數msgrcv()用於從消息隊列中讀取消息,該函數原型如下:

 #include <sys/types.h>
 #include <sys/ipc.h>
 #include <sys/msg.h>

 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                 int msgflg);

對於該函數中的5個參數,詳細解釋說明如下:① msqid

該參數表示函數msgget()返回的消息隊列標識符。

msgp

指向要接收的消息的指針。

msgsz

msgp指向的消息的長度,消息緩衝區結構中mtext的大小,不包括數據類型。正如上面4.3節所描述,msgp的數據類型是struct msgbuf

msgtyp

我們希望閱讀的消息類型。可能是以下情況之一:

i.     msgtyp = 0, 將返回隊列上的第一條消息。ii. msgtyp > 0(正整數), 隊列上類型(mtype,即struct msgbuf中成員mtype)等於此整數的第一條消息(除非msgflg中設置了特定標誌,請參見下文⑤)。iii. msgtyp < 0(負整數), 隊列上類型小於或等於此整數絕對值的第一條消息。

如果msgtyp爲零,則檢索第一條消息,而不管其類型是什麼。該值可由接收進程用於指定消息選擇。總的來說,msgflg設置爲0

msgflg

該參數控制函數的行爲,以下任何標誌的邏輯 “或” 組合。

msgflg = IPC_NOWAIT,隊列沒有可讀消息不等待,返回ENOMSG錯誤。msgflg = MSG_EXCEPT,如果消息類型參數爲正整數,則返回類型不等於給定整數的第一條消息。msgflg = MSG_NOERROR,當消息大小超過msgze時被截斷。

這個參數依然是控制函數行爲的標誌,取值可以是:0,表示忽略;IPC_NOWAIT,如果消息隊列爲空,則返回一個 ENOMSG,並將控制權交回調用函數的進程。如果不指定這個參數,那麼進程將被阻塞直到函數可以從隊列中得到符合條件的消息爲止。如果一個 client 正在等待消息的時候隊列被刪除,EIDRM 就會被返回。如果進程在阻塞等待過程中收到了系統的中斷信號,EINTR 就會被返回。MSG_NOERROR,如果函數取得的消息長度大於 msgsz,將只返回 msgsz 長度的信息,剩下的部分被丟棄了。如果不指定這個參數,E2BIG 將被返回,而消息則留在隊列中不被取出。當消息從隊列內取出後,相應的消息就從隊列中刪除了。

在 Linux 內核的5.4.3版本中,msgflg支持以下選項參數:

//// 聲明於/include/uapi/linux/msg.h
/* msgrcv options */
#define MSG_NOERROR     010000  /* no error if message is too big */
#define MSG_EXCEPT      020000  /* recv any msg except of specified type.*/
#define MSG_COPY        040000  /* copy (not remove) all queue messages */

特別注意:當msgtype > 0msgflg = MSG_EXCEPT時,接收類型不等於msgtype的第一條消息。

該函數調用成功時,返回實際放入接收緩衝區msgp中的字符數。失敗返回-1

4.5 msgctl()

系統函數msgctl()用於消息隊列控制功能,它不僅僅有刪除消息隊列的作用。其函數原型如下:

 #include <sys/types.h>
 #include <sys/ipc.h>
 #include <sys/msg.h>

 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

該函數的各參數詳細如下描述:

msqid

msgget()函數返回的消息隊列標識符。

cmd

指定要在消息隊列上執行的命令,其具體如下:

所以如果我們執行刪除操作,我們可以將cmd設置爲IPC_RMID。對於IPC_INFO命令(Linux 系統特有),其對應的msginof數據類型聲明如下(/include/uapi/linux/msg.h文件):

/* buffer for msgctl calls IPC_INFO, MSG_INFO */
struct msginfo {
 int msgpool;
 int msgmap; 
 int msgmax; 
 int msgmnb; 
 int msgmni; 
 int msgssz; 
 int msgtql; 
 unsigned short  msgseg; 
};

buf

如果選項刪除隊列,則第3個參數bufNULL。參數buf的數據類型是 struct msqid_ds,其數據類型聲明如下所示:

/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
 struct ipc_perm msg_perm;
 struct msg *msg_first;  /* first message on queue,unused  */
 struct msg *msg_last;  /* last message in queue,unused */
 __kernel_time_t msg_stime; /* last msgsnd time */
 __kernel_time_t msg_rtime; /* last msgrcv time */
 __kernel_time_t msg_ctime; /* last change time */
 unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
 unsigned long  msg_lqbytes; /* ditto */
 unsigned short msg_cbytes; /* current number of bytes on queue */
 unsigned short msg_qnum; /* number of messages in queue */
 unsigned short msg_qbytes; /* max number of bytes on queue */
 __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
 __kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};

5. 與消息隊列相關的 shell 命令

與消息隊列相關的 shell 命令有兩個,分別是:ipcsipcrm。它們分別用來查看消息隊列以及刪除消息隊列;除此之外,這兩個命令還可以用於共享內存、信號量。這些都將在後面的相關文章中詳細進行講解,本文只着重說明與消息隊列相關的選項。

5.1 查看消息隊列

使用ipcs -q可以查看當前消息隊列中的消息情況。

如下圖所示,一開始內核中沒有消息隊列,然後我們啓動write(實現寫消息隊列的demo)應用程序,之後向該消息隊列中寫入HELLO字段串,之後再次使用ipcs -q便可查看到消息隊列向的keymsqid以及消息數量等信息。

5.2 刪除消息隊列

現在使用ipcrm將該消息隊列中的所有消息刪掉,如下所示,可以選擇根據keymsqid去刪除,這裏選項的是根據msqid進行刪除。

6. 消息隊列實戰

· 實例一該實例中,生產者不斷地向消息隊列中寫入用戶終端輸入的消息,直到輸入 “over” 字符串後,結束該過程。

生產者代碼:

/// 向消息隊列寫入消息:sender.c文件;   gcc sender.c -o sender
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>

struct msgbuf{
        int mtype;
        char mtext[128];
};

int main(int argc,char *argv[])
{
        int msgid;
        char buf[128] = {0};
        key_t key = 11111;

        struct msgbuf sndbuf;

        if((msgid = msgget(key, 0666 | IPC_CREAT)) < 0)
        {
                perror("msgget");
                exit(-1);
        }

        while(1)
        {
                printf("Enter some text: ");

                memset(&sndbuf, 0x00, sizeof(sndbuf));
                sndbuf.mtype = 1;

                memset(buf, 0x00, sizeof(buf));
                fgets(buf, sizeof(buf), stdin);

                strncpy(sndbuf.mtext, buf, strlen(buf));
                if(msgsnd(msgid,(void *)&sndbuf, sizeof(sndbuf), 0) < 0 )
                {
                        perror("msgsnd");
                        exit(-1);
                }
        }

        return 0;
}

消費者代碼:

/// 從消息隊列中讀取消息;receiver.c文件;gcc receiver.c -o receiver
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>

#include <ctype.h>

struct msgbuf{
        int mtype;
        char mtext[128];
};

int main(int argc,char *argv[])
{
        int msgid;
        char buf[128] = {0};
        key_t key = 11111;

        struct msgbuf rcvbuf;

        if((msgid = msgget(key, 0666 | IPC_CREAT)) < 0)
        {
                perror("msgget");
                exit(-1);
        }

        while(1)
        {
                memset(&rcvbuf, 0x00, sizeof(rcvbuf));
                if(msgrcv(msgid, (void *)&rcvbuf, sizeof(rcvbuf), 0,  0) < 0)
                {
                        perror("msgrcv");
                        exit(-1);
                }
                
    // 移除掉fgets函數讀取到的換行符。
                if(isspace(rcvbuf.mtext[strlen(rcvbuf.mtext) - 1]))
                {
                        rcvbuf.mtext[strlen(rcvbuf.mtext) - 1] = '\0';
                }

                printf("rcv msg[%s]\n", rcvbuf.mtext);

                if(!strncmp(rcvbuf.mtext, "over", 4))
                {
                        puts("over cycle.");
                        break;
                }
        }
        msgctl(msgid, IPC_RMID, NULL);

        return 0;
}

分別打開兩個 shell 終端,一個紙箱sender進程,另外一個紙箱receiver進程。在sender進程中,用戶不斷從終端輸入消息,而receiver終端則會將所從消息隊列中獲取到的消息打印到出來,直到讀取到字符 “over” 則從內核中刪除該消息隊列。

KA2HEG

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/gmtEjV8blK8NLzBcSHgiew