Skip to content

Commit

Permalink
improve some logs and fix cluster sync to actually wait
Browse files Browse the repository at this point in the history
  • Loading branch information
rmb938 committed Mar 26, 2022
1 parent a74e045 commit 35ddb64
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/kafka/client/request_packet_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type RequestPacketHandler struct {
func (rh *RequestPacketHandler) HandleRequest(client *Client, inPacket *netCodec.Packet) error {
packetReader := netCodec.NewPacketReader(inPacket)

log := rh.Log.WithValues("request_api_key", inPacket.ReqHeader.Key, "request_api_version", inPacket.ReqHeader.Version, "correlation_id", inPacket.ReqHeader.CorrelationId)
log := rh.Log.WithValues("from-address", client.conn.RemoteAddr().String(), "request_api_key", inPacket.ReqHeader.Key, "request_api_version", inPacket.ReqHeader.Version, "correlation_id", inPacket.ReqHeader.CorrelationId)

decoderMap, ok := codec.MessageDecoderMapping[inPacket.ReqHeader.Key]
if !ok {
Expand Down Expand Up @@ -58,7 +58,7 @@ func (rh *RequestPacketHandler) HandleRequest(client *Client, inPacket *netCodec
}

// TODO: figure out throttling stuff
log.Info("handling packet")
log.V(1).Info("handling packet")
respMessage, err := inHandler.Handle(client.Broker, log, reqMessage)
if err != nil {
return fmt.Errorf("error handling packet: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/logical_broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (b *Broker) InitClusters() error {
if topic == nil {
_, err := cluster1.APICreateTopic("test1", 1, 1, map[string]*string{})
if err != nil {
return err
return fmt.Errorf("err creating topic test1: %w", err)
}
}

Expand All @@ -170,7 +170,7 @@ func (b *Broker) InitClusters() error {
if topic == nil {
_, err := cluster2.APICreateTopic("test2", 1, 1, map[string]*string{})
if err != nil {
return err
return fmt.Errorf("err creating topic test2: %w", err)
}
}

Expand All @@ -182,7 +182,7 @@ func (b *Broker) InitClusters() error {
if topic == nil {
_, err := cluster3.APICreateTopic("test3", 1, 3, map[string]*string{})
if err != nil {
return err
return fmt.Errorf("err creating topic test3: %w", err)
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/kafka/logical_broker/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type Cluster struct {
Name string
log logr.Logger

syncedOnce sync.Once
syncedChan chan struct{}
topicConfigSyncedOnce sync.Once
topicLeaderSyncedOnce sync.Once
syncedChan chan struct{}

controller *Controller

Expand Down Expand Up @@ -76,6 +77,8 @@ func (c *Cluster) initFranzKafkaClient(addrs []string) error {
}

func (c *Cluster) WaitSynced() {
// Two loads, one for topic configs and one for leaders
<-c.syncedChan
<-c.syncedChan
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/logical_broker/cluster_consomer_topic_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *Cluster) ConsumeTopicLeaders() {
}
}

c.shouldBeSynced(highWaterMarks, lowWaterMarks)
c.shouldBeSynced(&c.topicLeaderSyncedOnce, highWaterMarks, lowWaterMarks)
}
}
}
7 changes: 4 additions & 3 deletions pkg/kafka/logical_broker/cluster_consumer_topic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"os"
"sync"

"github.com/rmb938/krouter/pkg/kafka/logical_broker/topics"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -90,12 +91,12 @@ func (c *Cluster) ConsumeTopicConfigs() {
}
}

c.shouldBeSynced(highWaterMarks, lowWaterMarks)
c.shouldBeSynced(&c.topicConfigSyncedOnce, highWaterMarks, lowWaterMarks)
}
}
}

func (c *Cluster) shouldBeSynced(highWaterMarks, lowWaterMarks map[int32]int64) {
func (c *Cluster) shouldBeSynced(once *sync.Once, highWaterMarks, lowWaterMarks map[int32]int64) {
synced := true
for partitionIndex, highMark := range highWaterMarks {
if lowMark, ok := lowWaterMarks[partitionIndex]; ok {
Expand All @@ -106,7 +107,7 @@ func (c *Cluster) shouldBeSynced(highWaterMarks, lowWaterMarks map[int32]int64)
}

if synced {
c.syncedOnce.Do(func() {
once.Do(func() {
c.syncedChan <- struct{}{}
})
}
Expand Down

0 comments on commit 35ddb64

Please sign in to comment.