• 正文
  • 相关推荐
申请入驻 产业图谱

如何设计实现一个无锁高并发的环形连续内存缓冲队列

05/01 08:25
1537
加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论

队列,也称作FIFO,常用数据结构之一,特点是先进先出。

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

设计思想

首先从理想的无限缓冲区到实际的使用进行设计思考。

理想化的无限缓冲区

在理想情况下,写入器(数据生产者)和读取器(数据消费者)在使用过程中,能够访问相同的、连续的、并且是无限的缓冲区。写入器和读取器各自保存一个索引,分别指向缓冲区中写和读的位置,即与之对齐的“写”和“读”指针开始进行操作。

当写入器想要在末端加入数据时,它会将数据追加到“写索引”后面的位置,之后将“写索引”移动到新写数据块的末尾。而读取器在顶端读取数据时,如果“写索引”比“读索引”位置大时,读写器就可以对已有数据进行读取操作。完成之后,读写器将“读索引”移动到读取数据块的末尾,以跟踪记录已经处理至缓冲区的哪一部分。

读取器永远不会试图读取超过“写索引”位置的数据,因为不能保证那里有有效的数据,即写入器在那里写入了东西。这也意味着“读索引”永远不能超过“写索引”。目前为止,我们假设这个理想内存系统总是连贯一致的,也就是说一旦执行了写操作,数据可以立即地、顺序地进行读取出来。

有界限的缓冲区

而现实中计算机并没有神奇的无限缓冲区。我们必须分配有限的内存空间,以供写入器和读取器之间进行或许无限的使用。在循环缓冲区中,“写索引”可以在抵达缓冲区末尾时跨越缓冲区的边界回绕到缓冲区的开始位置。当“写索引”接近缓冲区末尾并且又有新数据输入进来时,会将写操作分成两个数据块:一个写入缓冲区末尾剩余的空间,另一个将剩下的数据写入缓冲区开头。请注意,如果此时“读索引”位置接近缓冲区开头,那么这个写入操作可能会破坏尚未被读取器处理的数据。由于这个原因,“写索引”在被回绕后不允许超过“读索引”。

这样下来,可能会得到两种内存分布情况:

  • “写索引”在前面,“读索引”在后面,即在索引移动方向上,“写索引”超前于“读索引”,已写入但未被读取器处理的有效数据位于“读索引”和“写索引”之间的缓冲区内;
  • “读索引”在前面,“写索引”在后面,即在索引移动方向上,“读索引”超前于“写索引”,有效数据开始于“读索引”,结束于缓冲区末尾,并回绕到缓冲区开头直至“写索引”位置。

注意:上述第二种情况下,“写索引”和“读索引”可能存在重合,为了区分这两种情况,一般规定第二种情况下“写索引”和“读索引”不允许重合,即“写索引”位置必须落后于“读索引”一步。

因此,在循环缓冲区中,不断地从内存分布情况 1 进行到内存分布情况 2,又从 2 再回到 1,如此循环往复,当“读索引”到达缓冲区的末尾时,也回绕到缓冲区开头继续进行读取。

并发性设计

通常在使用缓冲区队列时,读数据和写数据大部分情况都是多线程或者前后台(中断)分别处理的,为了减少写入器、读取器两个线程之间或者前后台系统之间需要发生的协调,一种常见策略是,将读写变量独立出来,分别由读取器和写入器进行改变。这也简化了并发性设计中的逻辑推理,因为谁负责更改哪个变量总是很清楚。

让我们从一个简单的循环缓冲区开始,它具有原始的“写索引”和“读索引”。唯有写入器能更改“写索引”,而唯有读取器能更改“读索引”。

这样就可以不用锁进行操作,提高效率。

如何保证地址的连续性

在上述提到的有界缓冲区内存分布情况,第二种情况无法保证地址的连续性,因为有些场景需要使用到连续的内存块地址,解决这种场景的办法有:可以对缓存区进行分块,每一块固定的长度,即固定长度的队列,这样就能在一定程度上保证了地址的连续性。

代码实现

队列结构体定义先定义

一个队列结构体,包含了每个块的大小、数目、写入块索引、读取块索引等,为了解决“写索引”和“读索引”可能存在重合的两种情况,加入状态变量用来区分。

typedef?uint16_t?queuesize_t;

typedef?struct{
volatile?uint8_t?state;?/*!<?控制状态?*/

queuesize_t?end;????????/*!<?循环队列尾哨兵?*/

queuesize_t?head;???????/*!<?循环队列首哨兵?*/

queuesize_t?num;????????/*!<?循环队列中能存储的最多组数?*/

queuesize_t?size;???????/*!<?单组内存大小?*/

char??*pBufMem;?????????/*!<?Buf?指针?*/
}?QueueCtrl_t;

队列初始化

定义结构体后一定要对数据初始化,同时为接口提供一些入参变量设置队列的信息进行封装,如缓存区地址、队列的组数目、组内存大小和是否内存覆盖等信息。

/**
*?@brief??????队列控制初始化,?可采用的是动态/静态内存分配方式
*
*?@param[in,out]?pCtrl?队列控制句柄
*?@param[in]??buf??????buf?地址
*?@param[in]??queueNum?队列数目大小
*?@param[in]??size?????队列中单个内存大小
*?@param[in]??isCover??false,不覆盖;?true,队列满了覆盖未读取的最早旧数据
*?@return?????0,成功;?-1,失败
*/
int?Queue_Init(QueueCtrl_t?*pCtrl,?const?void?*pBufMem,?queuesize_t?queueNum,?queuesize_t?size,?bool?isCover)
{
if?(pCtrl?==?NULL?||?pBufMem?==?NULL?||?queueNum?==?0?||?size?==?0)
{
return?-1;
}

pCtrl->end?????=?0;
pCtrl->head????=?0;
pCtrl->pBufMem?=?(char?*)pBufMem;
pCtrl->num??=?queueNum;
pCtrl->size????=?size;
pCtrl->state???=?0x00;

if?(isCover)
{
QUEUE_STATE_SET(pCtrl,?QUEUE_ENABLE_COVER);
}

return?0;
}

队列写操作

队列都是在末端增加数据,因为队列组的大小已经固定,因此在写操作数据时可以省略新数据的内存大小。

/**
*?@brief??????在队列末尾加入新的数据
*
*?@param[in,out]?pCtrl?队列控制句柄
*?@param[in]?????src???新的数据
*?@retval?????返回的值含义如下
*?????????????@arg?0:?写入成功
*?????????????@arg?-1:?写入失败
*/
extern?int?Queue_Push(QueueCtrl_t?*pCtrl,?const?void?*src)
{
if?(pCtrl?==?NULL?||?src?==?NULL)
{
return?-1;
}

if?(IS_QUEUE_STATE_SET(pCtrl,?QUEUE_DISABLE_PUSH))
{
return?-1;
}

memcpy(&pCtrl->pBufMem[pCtrl->end?*?pCtrl->size],?src,?pCtrl->size);
pCtrl->end++;
QUEUE_STATE_SET(pCtrl,?QUEUE_EXIT_DATA);

if?((pCtrl->end)?>=?(pCtrl->num))
{
(pCtrl->end)?=?0;
}

if?(IS_QUEUE_STATE_SET(pCtrl,?QUEUE_DATA_FULL))
{
(pCtrl->head)?=?(pCtrl->end);
}
else?if?((pCtrl->end)?==?(pCtrl->head))
{
QUEUE_STATE_SET(pCtrl,?QUEUE_DATA_FULL);

if?(!IS_QUEUE_STATE_SET(pCtrl,?QUEUE_ENABLE_COVER))
{
QUEUE_STATE_SET(pCtrl,?QUEUE_DISABLE_PUSH);
}
}

return?0;
}

队列读操作

队列都是在顶端读取数据,因为队列组的大小已经固定,因此在都操作数据时可以省略数据读取存入的内存大小(必须保证读取的内存大小足够)。

/**
*?@brief??????读取并弹出队列顶端的数据
*
*?@param[in,out]?pCtrl?队列控制句柄
*?@param[in]?????dest??读取的数据
*?@retval?????返回的值含义如下
*?????????????@arg?0:?成功
*?????????????@arg?-1:?失败
*/
int?Queue_Pop(QueueCtrl_t?*pCtrl,?void?*dest)
{
if?(pCtrl?==?NULL)
{
return?-1;
}

if?(!IS_QUEUE_STATE_SET(pCtrl,?QUEUE_EXIT_DATA))
{
return?-1;
}

memcpy((char?*)dest,?&pCtrl->pBufMem[pCtrl->head?*?pCtrl->size],?pCtrl->size);
pCtrl->head++;

if?((pCtrl->head)?>=?(pCtrl->num))
{
pCtrl->head?=?0;
}

if?((pCtrl->head)?==?(pCtrl->end))
{
if?(!IS_QUEUE_STATE_SET(pCtrl,?QUEUE_DATA_FULL))
{
QUEUE_STATE_CLEAR(pCtrl,?QUEUE_EXIT_DATA);
}
}

QUEUE_STATE_CLEAR(pCtrl,?QUEUE_DISABLE_PUSH);
QUEUE_STATE_CLEAR(pCtrl,?QUEUE_DATA_FULL);

return?0;
}

相关推荐