Skip to content

Commit

Permalink
feat:订阅/发布 功能
Browse files Browse the repository at this point in the history
  • Loading branch information
yushaona committed Jan 7, 2024
1 parent 5e018ae commit e1f0efa
Show file tree
Hide file tree
Showing 12 changed files with 695 additions and 35 deletions.
8 changes: 8 additions & 0 deletions abstract/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,12 @@ type Connection interface {
SetDBIndex(int)
SetPassword(string)
GetPassword() string
Write([]byte) (int, error)

IsClosed() bool
// pub/sub
Subscribe(channel string)
Unsubscribe(channel string)
SubCount() int
GetChannels() []string
}
36 changes: 4 additions & 32 deletions datastruct/dict/concurrent.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package dict

import (
"math"
"sync"
"sync/atomic"

"github.com/gofish2020/easyredis/utils"
)

// 并发安全的字典
Expand All @@ -30,38 +31,9 @@ func (sh *shard) forEach(consumer Consumer) bool {
return true
}

// 计算比param参数大,并满足是2的N次幂, 最近接近param的数值size
func computeCapacity(param int) (size int) {
if param <= 16 {
return 16
}
n := param - 1
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
if n < 0 {
return math.MaxInt32
}
return n + 1
}

// 计算key的hashcode
const prime32 = uint32(16777619)

func fnv32(key string) uint32 {
hash := uint32(2166136261)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}

// 构造字典对象
func NewConcurrentDict(shardCount int) *ConcurrentDict {
shardCount = computeCapacity(shardCount)
shardCount = utils.ComputeCapacity(shardCount)

dict := &ConcurrentDict{}
shds := make([]*shard, shardCount)
Expand All @@ -84,7 +56,7 @@ func (c *ConcurrentDict) index(code uint32) uint32 {

// 获取key对应的shard
func (c *ConcurrentDict) getShard(key string) *shard {
return c.shds[c.index(fnv32(key))]
return c.shds[c.index(utils.Fnv32(key))]
}

// 获取key保存的值
Expand Down
174 changes: 174 additions & 0 deletions datastruct/list/linkedlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package list

import (
"errors"
)

type Consumer func(i int, val interface{}) bool

type Expected func(actual interface{}) bool

// 双向链表,实现增Add/删Del/改Modify/查 Get

type LinkedList struct {
first *node
last *node

size int
}

type node struct {
pre *node
next *node
val any
}

func newNode(val any) *node {

return &node{val: val}
}

// Add push new node to the tail
func (l *LinkedList) Add(val interface{}) {
n := newNode(val)

if l.last == nil { // 空链表
l.first = n
l.last = n
} else {
n.pre = l.last
l.last.next = n
l.last = n
}
l.size++
}

func (l *LinkedList) find(index int) *node {
// 要找的节点在链表的前半部分
if index < l.Len()/2 {
n := l.first
for i := 0; i < index; i++ {
n = n.next
}
return n
}
// 要找的节点在链表的后半部分
n := l.last
for i := l.Len() - 1; i > index; i-- {
n = n.pre
}
return n
}

// 获取指定索引节点的值
func (l *LinkedList) Get(index int) (any, error) {
if index < 0 || index >= l.size {
return nil, errors.New("out of range")
}
n := l.find(index)
return n.val, nil

}

// 修改指定节点的值
func (l *LinkedList) Modify(index int, val any) error {
if index < 0 || index >= l.size {
return errors.New("out of range")
}

n := l.find(index)
n.val = val
return nil
}

func (l *LinkedList) delNode(n *node) {
// n 的前驱节点
pre := n.pre
// n 的后驱节点
next := n.next

if pre != nil {
pre.next = next
} else { // 说明n就是第一个节点
l.first = next
}

if next != nil {
next.pre = pre
} else { // 说明n就是最后一个节点
l.last = pre
}

// for gc
n.pre = nil
n.next = nil

l.size--
}

// 删除指定节点
func (l *LinkedList) Del(index int) (any, error) {
if index < 0 || index >= l.size {
return nil, errors.New("out of range")
}
n := l.find(index)
l.delNode(n)
return n.val, nil
}

// 删除最后一个节点
func (l *LinkedList) DelLast() (any, error) {
if l.Len() == 0 { // do nothing
return nil, nil
}
return l.Del(l.Len() - 1)
}

// 遍历链表中的元素
func (l *LinkedList) ForEach(consumer Consumer) {
i := 0
for n := l.first; n != nil; n = n.next {
if !consumer(i, n.val) {
break
}
}
}

// 判断是否包含指定值
func (l *LinkedList) Contain(expect Expected) bool {
result := false
l.ForEach(func(index int, val interface{}) bool {
if expect(val) {
result = true
return false
}
return true
})
return result
}

// 删除链表中的指定值(所有)
func (l *LinkedList) DelAllByVal(expected Expected) int {

removed := 0
for n := l.first; n != nil; {
next := n.next
if expected(n.val) {
l.delNode(n)
removed++
}
n = next
}
return removed
}

// 链表的长度
func (l *LinkedList) Len() int {
return l.size
}

// 构建新链表
func NewLinkedList() *LinkedList {
l := &LinkedList{}
return l
}
14 changes: 14 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gofish2020/easyredis/abstract"
"github.com/gofish2020/easyredis/aof"
"github.com/gofish2020/easyredis/engine/payload"
"github.com/gofish2020/easyredis/pubhub"
"github.com/gofish2020/easyredis/redis/protocol"
"github.com/gofish2020/easyredis/tool/conf"
"github.com/gofish2020/easyredis/tool/logger"
Expand All @@ -24,6 +25,10 @@ type Engine struct {
delay *timewheel.Delay
// Append Only File
aof *aof.AOF

// 订阅

hub *pubhub.Pubhub
}

func NewEngine() *Engine {
Expand All @@ -43,6 +48,8 @@ func NewEngine() *Engine {
// 赋值到 dbSet中
engine.dbSet[i] = dbset
}

engine.hub = pubhub.NewPubsub()
// 启用AOF日志
if conf.GlobalConfig.AppendOnly {
// 创建*AOF对象
Expand Down Expand Up @@ -86,6 +93,7 @@ func (e *Engine) Exec(c abstract.Connection, redisCommand [][]byte) (result prot
result = protocol.NewUnknownErrReply()
}
}()
// 命令小写
commandName := strings.ToLower(string(redisCommand[0]))
if commandName == "ping" { // https://redis.io/commands/ping/
return Ping(redisCommand[1:])
Expand All @@ -107,6 +115,12 @@ func (e *Engine) Exec(c abstract.Connection, redisCommand [][]byte) (result prot
return protocol.NewGenericErrReply("AppendOnly is false, you can't rewrite aof file")
}
return BGRewriteAOF(e)
case "subscribe":
return e.hub.Subscribe(c, redisCommand[1:])
case "unsubscribe":
return e.hub.Unsubscribe(c, redisCommand[1:])
case "publish":
return e.hub.Publish(c, redisCommand[1:])
}

// redis 命令处理
Expand Down
Loading

0 comments on commit e1f0efa

Please sign in to comment.