Skip to content

Commit

Permalink
added support of different version of commands replicated between nod…
Browse files Browse the repository at this point in the history
…es (since 0x106); fixed manticoresoftware#2850

allow cluster to have master nodes (old protocol version) that commits data into cluster and other nodes these receives only data from cluster (new versions)
that allow to upgrade nodes in cluster one by one
  • Loading branch information
tomatolog committed Dec 25, 2022
1 parent 5861f8b commit 8011c2c
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
5 changes: 3 additions & 2 deletions src/accumulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,13 @@ void RtAccum_t::ResetRowID()
m_tNextRowID = 0;
}

void RtAccum_t::LoadRtTrx ( const BYTE* pData, int iLen )
void RtAccum_t::LoadRtTrx ( const BYTE * pData, int iLen, DWORD uVer )
{
MemoryReader_c tReader ( pData, iLen );
m_bReplace = !!tReader.GetVal<BYTE>();
tReader.GetVal ( m_uAccumDocs );
tReader.GetVal ( m_iAccumBytes );
if ( uVer>=0x106 )
tReader.GetVal ( m_iAccumBytes );

// insert and replace
m_dAccum.Resize ( tReader.GetDword() );
Expand Down
2 changes: 1 addition & 1 deletion src/accumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RtAccum_t
int GetIndexGeneration() const { return m_iIndexGeneration; }
ReplicationCommand_t * AddCommand ( ReplicationCommand_e eCmd, CSphString sIndex, CSphString sCluster = CSphString() );

void LoadRtTrx ( const BYTE * pData, int iLen );
void LoadRtTrx ( const BYTE * pData, int iLen, DWORD uVer );
void SaveRtTrx ( MemoryWriter_c & tWriter ) const;

const BYTE * GetPackedKeywords() const;
Expand Down
4 changes: 2 additions & 2 deletions src/searchdreplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,7 @@ bool ParseCmdReplicated ( const BYTE * pData, int iLen, bool bIsolated, const CS
}

WORD uVer = tReader.GetVal<WORD>();
if ( uVer!=g_iReplicateCommandVer )
if ( uVer>g_iReplicateCommandVer )
{
sphWarning ( "replication command %d, version mismatch %d, got %d", (int)eCommand, g_iReplicateCommandVer, (int)uVer );
return false;
Expand Down Expand Up @@ -1639,7 +1639,7 @@ bool ParseCmdReplicated ( const BYTE * pData, int iLen, bool bIsolated, const CS
break;

case ReplicationCommand_e::RT_TRX:
tAcc.LoadRtTrx ( pRequest, iRequestLen );
tAcc.LoadRtTrx ( pRequest, iRequestLen, uVer );
sphLogDebugRpl ( "rt trx, table '%s'", pCmd->m_sIndex.cstr() );
break;

Expand Down

0 comments on commit 8011c2c

Please sign in to comment.