卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章33941本站已运行390

等待映射中的值在 Go 中可用

等待映射中的值在 go 中可用

问题内容

我有一个程序,基本上有三种情况 - 设置键的值,获取值(如果存在),或者等到给定键的值可用。我最初的想法 - 创建一个带有 map[string]interface{} 的新类型 - 其中存储“持久”值。除此之外,为了等待一个值,我计划使用 map[string](chan struct{})。当调用 set() 方法时,我会写入该通道,任何等待它的人都会知道该值在那里。

我事先不知道密钥 - 它们是随机的。我不确定如何正确实现 wait() 方法。

type Map struct {
    sync.Mutex

    m    map[string]interface{}
    wait map[string]chan (struct{})
}


func (m *Map) Set(key string, value interface{}) {
    m.ensureWaitChan(key)

    m.Lock()
    defer m.Unlock()

    m.m[key] = value

    // Signal to all waiting.
    m.wait[key] <- struct{}{}
}


func (m *Map) Wait(key string) interface{} {
    m.ensureWaitChan(key)

    m.Lock()
    
    value, ok := m.m[key]
    if ok {
        m.Unlock()
        return value
    }

    m.Unlock()
    // <------ Unlocked state where something might happen.
    <-m.wait[key]

    value := m.m[key]

    return value    
}

// If the channel does not exist for those waiting - create it.
func (m *Map) ensureWaitChan(key string) {
    m.Lock()
    defer m.Unlock()

    _, ok := m.wait[key]
    if ok {
        return
    }

    m.wait[key] = make(chan struct{}, 100)
}

问题是 - wait() 中存在竞争条件 - 在我释放互斥体之后,在我开始侦听通道上的传入值之前。

处理这个问题的最佳方法是什么?欢迎任何其他关于如何实现这一点的建议,我相信一定有更好的方法来做到这一点。我不会以固定的时间间隔或类似的方式轮询该值。


正确答案


您正在寻找的是同步映射和消息代理之间的混合。我们可以通过利用通信和同步通道来实现这一点,以便订阅者可以在消息发布后立即收到消息(如果消息尚未在缓存中)。

type Map struct {
    sync.Mutex

    m    map[string]any
    subs map[string][]chan any
}

func (m *Map) Set(key string, value any) {
    m.Lock()
    defer m.Unlock()

    m.m[key] = value

    // Send the new value to all waiting subscribers of the key
    for _, sub := range m.subs[key] {
        sub <- value
    }
    delete(m.subs, key)
}

func (m *Map) Wait(key string) any {
    m.Lock()
    // Unlock cannot be deferred so we can unblock Set() while waiting

    value, ok := m.m[key]
    if ok {
        m.Unlock()
        return value
    }

    // if there is no value yet, subscribe to any new values for this key
    ch := make(chan any)
    m.subs[key] = append(m.subs[key], ch)
    m.Unlock()

    return <-ch
}

由于订阅者在等待时必须解锁地图互斥体,因此他们无法安全地访问添加到地图中的新消息。我们通过自己的频道将新值直接发送给所有订阅者,这样我们就不需要在 set 中添加更多同步,以确保所有订阅者在解锁地图本身之前都满意。提前解锁地图将允许订阅者直接读取它,但也会允许同时插入新值,从而导致结果不一致。

正在运行的版本,还包括带有类型参数的通用 map 实现: https://go.dev /play/p/an7vrspdgmo

卓越飞翔博客
上一篇: 无法连接到 azure PostresSQL 数据库 - 用户名应采用 <用户名@主机名> 格式
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏