Skip to content

Commit

Permalink
chore(no-torrent)_: Replaced entirely LogStdout with default logger
Browse files Browse the repository at this point in the history
  • Loading branch information
Samyoul committed Jun 7, 2024
1 parent 210c8bd commit a9c7db6
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 103 deletions.
1 change: 0 additions & 1 deletion protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ type ArchiveContract interface {
type TorrentContract interface {
ArchiveContract

LogStdout(string, ...zap.Field)
SetOnline(bool)
SetTorrentConfig(*params.TorrentConfig)
StartTorrentClient() error
Expand Down
41 changes: 16 additions & 25 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,25 @@ import (
type ArchiveManager struct {
torrentConfig *params.TorrentConfig

logger *zap.Logger
stdoutLogger *zap.Logger

logger *zap.Logger
persistence *Persistence
identity *ecdsa.PrivateKey
encryptor *encryption.Protocol

publisher Publisher
}

func NewArchiveManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger *zap.Logger, persistence *Persistence, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *ArchiveManager {
func NewArchiveManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *ArchiveManager {
return &ArchiveManager{
torrentConfig: torrentConfig,
logger: logger,
stdoutLogger: stdoutLogger,
persistence: persistence,
identity: identity,
encryptor: encryptor,
publisher: publisher,
}
}

// LogStdout appears to be some kind of debug tool specifically for torrent functionality
func (m *ArchiveManager) LogStdout(msg string, fields ...zap.Field) {
m.stdoutLogger.Info(msg, fields...)
m.logger.Debug(msg, fields...)
}

func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {

loadFromDB := len(msgs) == 0
Expand Down Expand Up @@ -114,7 +105,7 @@ func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes,
CommunityID: communityID.String(),
}})

m.LogStdout("creating archives",
m.logger.Debug("creating archives",
zap.Any("startDate", startDate),
zap.Any("endDate", endDate),
zap.Duration("partition", partition),
Expand All @@ -123,7 +114,7 @@ func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes,
if from.Equal(endDate) || from.After(endDate) {
break
}
m.LogStdout("creating message archive",
m.logger.Debug("creating message archive",
zap.Any("from", from),
zap.Any("to", to),
)
Expand All @@ -144,7 +135,7 @@ func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes,

if len(messages) == 0 {
// No need to create an archive with zero messages
m.LogStdout("no messages in this partition")
m.logger.Debug("no messages in this partition")
from = to
to = to.Add(partition)
if to.After(endDate) {
Expand All @@ -153,7 +144,7 @@ func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes,
continue
}

m.LogStdout("creating archive with messages", zap.Int("messagesCount", len(messages)))
m.logger.Debug("creating archive with messages", zap.Int("messagesCount", len(messages)))

// Not only do we partition messages, we also chunk them
// roughly by size, such that each chunk will not exceed a given
Expand Down Expand Up @@ -308,7 +299,7 @@ func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes,
return archiveIDs, err
}

m.LogStdout("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix()))
m.logger.Debug("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix()))

m.publisher.publish(&Subscription{
HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{
Expand All @@ -318,7 +309,7 @@ func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes,
},
})
} else {
m.LogStdout("no archives created")
m.logger.Debug("no archives created")
m.publisher.publish(&Subscription{
NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
Expand Down Expand Up @@ -433,22 +424,22 @@ func (m *ArchiveManager) ExtractMessagesFromHistoryArchive(communityID types.Hex
}
defer dataFile.Close()

m.LogStdout("extracting messages from history archive",
m.logger.Debug("extracting messages from history archive",
zap.String("communityID", communityID.String()),
zap.String("archiveID", archiveID))
metadata := index.Archives[archiveID]

_, err = dataFile.Seek(int64(metadata.Offset), 0)
if err != nil {
m.LogStdout("failed to seek archive data file", zap.Error(err))
m.logger.Error("failed to seek archive data file", zap.Error(err))
return nil, err
}

data := make([]byte, metadata.Size-metadata.Padding)
m.LogStdout("loading history archive data into memory", zap.Float64("data_size_MB", float64(metadata.Size-metadata.Padding)/1024.0/1024.0))
m.logger.Debug("loading history archive data into memory", zap.Float64("data_size_MB", float64(metadata.Size-metadata.Padding)/1024.0/1024.0))
_, err = dataFile.Read(data)
if err != nil {
m.LogStdout("failed failed to read archive data", zap.Error(err))
m.logger.Error("failed failed to read archive data", zap.Error(err))
return nil, err
}

Expand All @@ -460,23 +451,23 @@ func (m *ArchiveManager) ExtractMessagesFromHistoryArchive(communityID types.Hex
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(data, &protocolMessage)
if err != nil {
m.LogStdout("failed to unmarshal protocol message", zap.Error(err))
m.logger.Error("failed to unmarshal protocol message", zap.Error(err))
return nil, err
}

pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
m.logger.Debug("failed to decompress community pubkey", zap.Error(err))
m.logger.Error("failed to decompress community pubkey", zap.Error(err))
return nil, err
}
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
if err != nil {
m.LogStdout("failed to decrypt message archive", zap.Error(err))
m.logger.Error("failed to decrypt message archive", zap.Error(err))
return nil, err
}
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
if err != nil {
m.LogStdout("failed to unmarshal message archive", zap.Error(err))
m.logger.Error("failed to unmarshal message archive", zap.Error(err))
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/communities/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *ManagerSuite) buildManagers(ownerVerifier OwnerVerifier) (*Manager, Tor
s.Require().NoError(m.Start())

tc := buildTorrentConfig()
t, err := NewTorrentManager(&tc, logger, m.GetPersistence(), nil, key, nil, m)
t := NewTorrentManager(&tc, logger, m.GetPersistence(), nil, key, nil, m)
s.Require().NoError(err)

return m, t
Expand Down
64 changes: 28 additions & 36 deletions protocol/communities/manager_torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ type TorrentManager struct {
historyArchiveTasksWaitGroup sync.WaitGroup
historyArchiveTasks sync.Map // stores `chan struct{}`

logger *zap.Logger
stdoutLogger *zap.Logger

logger *zap.Logger
persistence *Persistence
transport *transport.Transport
identity *ecdsa.PrivateKey
Expand All @@ -79,36 +77,30 @@ type TorrentManager struct {
// In this case this version of NewTorrentManager will return the full Desktop TorrentManager ensuring that the
// build command will import and build the torrent deps for the Desktop OSes.
// NOTE: It is intentional that this file contains the identical function name as in "manager_torrent_mobile.go"
func NewTorrentManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) (TorrentContract, error) {
stdoutLogger, err := zap.NewDevelopment()
if err != nil {
return nil, fmt.Errorf("failed to create archive logger %w", err)
}

func NewTorrentManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *TorrentManager {
return &TorrentManager{
torrentConfig: torrentConfig,
torrentTasks: make(map[string]metainfo.Hash),
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),

logger: logger,
stdoutLogger: stdoutLogger,
logger: logger,

persistence: persistence,
transport: transport,
identity: identity,
encryptor: encryptor,

publisher: publisher,
ArchiveManager: NewArchiveManager(torrentConfig, logger, stdoutLogger, persistence, identity, encryptor, publisher),
}, nil
ArchiveManager: NewArchiveManager(torrentConfig, logger, persistence, identity, encryptor, publisher),
}
}

func (m *TorrentManager) SetOnline(online bool) {
if online {
if m.torrentConfig != nil && m.torrentConfig.Enabled && !m.torrentClientStarted() {
err := m.StartTorrentClient()
if err != nil {
m.LogStdout("couldn't start torrent client", zap.Error(err))
m.logger.Error("couldn't start torrent client", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -270,7 +262,7 @@ func (m *TorrentManager) getLastMessageArchiveEndDate(communityID types.HexBytes
func (m *TorrentManager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
m.LogStdout("failed to get community chats filters", zap.Error(err))
m.logger.Error("failed to get community chats filters", zap.Error(err))
return 0, err
}

Expand All @@ -289,7 +281,7 @@ func (m *TorrentManager) GetHistoryArchivePartitionStartTimestamp(communityID ty

lastArchiveEndDateTimestamp, err := m.getLastMessageArchiveEndDate(communityID)
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
m.logger.Error("failed to get last archive end date", zap.Error(err))
return 0, err
}

Expand All @@ -300,13 +292,13 @@ func (m *TorrentManager) GetHistoryArchivePartitionStartTimestamp(communityID ty
// this community
lastArchiveEndDateTimestamp, err = m.getOldestWakuMessageTimestamp(topics)
if err != nil {
m.LogStdout("failed to get oldest waku message timestamp", zap.Error(err))
m.logger.Error("failed to get oldest waku message timestamp", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// This means there's no waku message stored for this community so far
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
m.LogStdout("can't find valid `lastArchiveEndTimestamp`")
m.logger.Debug("can't find valid `lastArchiveEndTimestamp`")
return 0, nil
}
}
Expand All @@ -326,7 +318,7 @@ func (m *TorrentManager) CreateAndSeedHistoryArchive(communityID types.HexBytes,
func (m *TorrentManager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
id := community.IDString()
if _, exists := m.historyArchiveTasks.Load(id); exists {
m.LogStdout("history archive tasks interval already in progress", zap.String("id", id))
m.logger.Error("history archive tasks interval already in progress", zap.String("id", id))
return
}

Expand All @@ -337,27 +329,27 @@ func (m *TorrentManager) StartHistoryArchiveTasksInterval(community *Community,
ticker := time.NewTicker(interval)
defer ticker.Stop()

m.LogStdout("starting history archive tasks interval", zap.String("id", id))
m.logger.Debug("starting history archive tasks interval", zap.String("id", id))
for {
select {
case <-ticker.C:
m.LogStdout("starting archive task...", zap.String("id", id))
m.logger.Debug("starting archive task...", zap.String("id", id))
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
m.logger.Error("failed to get last archive end date", zap.Error(err))
continue
}

if lastArchiveEndDateTimestamp == 0 {
// This means there are no waku messages for this community,
// so nothing to do here
m.LogStdout("couldn't determine archive start date - skipping")
m.logger.Debug("couldn't determine archive start date - skipping")
continue
}

topics, err := m.GetCommunityChatsTopics(community.ID())
if err != nil {
m.LogStdout("failed to get community chat topics ", zap.Error(err))
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}

Expand All @@ -367,7 +359,7 @@ func (m *TorrentManager) StartHistoryArchiveTasksInterval(community *Community,

err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval, community.Encrypted())
if err != nil {
m.LogStdout("failed to create and seed history archive", zap.Error(err))
m.logger.Error("failed to create and seed history archive", zap.Error(err))
continue
}
case <-cancel:
Expand Down Expand Up @@ -436,7 +428,7 @@ func (m *TorrentManager) SeedHistoryArchiveTorrent(communityID types.HexBytes) e

magnetLink := metaInfo.Magnet(nil, &info).String()

m.LogStdout("seeding torrent", zap.String("id", id), zap.String("magnetLink", magnetLink))
m.logger.Debug("seeding torrent", zap.String("id", id), zap.String("magnetLink", magnetLink))
return nil
}

Expand Down Expand Up @@ -500,12 +492,12 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
m.torrentTasks[id] = ml.InfoHash
timeout := time.After(20 * time.Second)

m.LogStdout("fetching torrent info", zap.String("magnetlink", magnetlink))
m.logger.Debug("fetching torrent info", zap.String("magnetlink", magnetlink))
select {
case <-timeout:
return nil, ErrTorrentTimedout
case <-cancelTask:
m.LogStdout("cancelled fetching torrent info")
m.logger.Debug("cancelled fetching torrent info")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
case <-torrent.GotInfo():
Expand All @@ -521,14 +513,14 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
indexFile := files[i]
indexFile.Download()

m.LogStdout("downloading history archive index")
m.logger.Debug("downloading history archive index")
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-cancelTask:
m.LogStdout("cancelled downloading archive index")
m.logger.Debug("cancelled downloading archive index")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
case <-ticker.C:
Expand All @@ -545,7 +537,7 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
}

if len(existingArchiveIDs) == len(index.Archives) {
m.LogStdout("download cancelled, no new archives")
m.logger.Debug("download cancelled, no new archives")
return downloadTaskInfo, nil
}

Expand Down Expand Up @@ -586,8 +578,8 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
endIndex := startIndex + int(metadata.Size)/pieceLength

downloadMsg := fmt.Sprintf("downloading data for message archive (%d/%d)", downloadTaskInfo.TotalDownloadedArchivesCount+1, downloadTaskInfo.TotalArchivesCount)
m.LogStdout(downloadMsg, zap.String("hash", hash))
m.LogStdout("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
m.logger.Debug(downloadMsg, zap.String("hash", hash))
m.logger.Debug("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
torrent.DownloadPieces(startIndex, endIndex)

piecesCompleted := make(map[int]bool)
Expand Down Expand Up @@ -615,15 +607,15 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
break downloadLoop
}
case <-cancelTask:
m.LogStdout("downloading archive data interrupted")
m.logger.Debug("downloading archive data interrupted")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
}
}
downloadTaskInfo.TotalDownloadedArchivesCount++
err = m.persistence.SaveMessageArchiveID(communityID, hash)
if err != nil {
m.LogStdout("couldn't save message archive ID", zap.Error(err))
m.logger.Error("couldn't save message archive ID", zap.Error(err))
continue
}
m.publisher.publish(&Subscription{
Expand All @@ -639,7 +631,7 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
CommunityID: communityID.String(),
},
})
m.LogStdout("finished downloading archives")
m.logger.Debug("finished downloading archives")
return downloadTaskInfo, nil
}
}
Expand Down
Loading

0 comments on commit a9c7db6

Please sign in to comment.