Skip to content

Commit

Permalink
fix serialize / deserialize
Browse files Browse the repository at this point in the history
Mimetis committed Dec 5, 2022

Verified

This commit was signed with the committer’s verified signature.
matux Matias Pequeno
1 parent 7e87c19 commit f68290c
Showing 11 changed files with 159 additions and 245 deletions.
5 changes: 5 additions & 0 deletions Projects/Dotmim.Sync.Core/Interceptors/Interceptors.cs
Original file line number Diff line number Diff line change
@@ -27,6 +27,11 @@ public List<InterceptorWrapper<T>> GetInterceptors<T>() where T : ProgressArgs
return (List<InterceptorWrapper<T>>)syncInterceptors;
}

/// <summary>
/// Returns a boolean value indicating if we have any interceptors for the current type T
/// </summary>
public bool HasInterceptors<T>() where T : ProgressArgs => this.GetInterceptors<T>().Any();

/// <summary>
/// Remove all interceptors based on type of ProgressArgs
/// </summary>
Original file line number Diff line number Diff line change
@@ -64,8 +64,8 @@ internal virtual async Task InternalApplyCleanErrorsAsync(ScopeInfo scopeInfo, S
if (tableBpis == null || !tableBpis.Any())
continue;

var localSerializerReader = new LocalJsonSerializer();
var localSerializerWriter = new LocalJsonSerializer();
var localSerializerReader = new LocalJsonSerializer(this, context);
var localSerializerWriter = new LocalJsonSerializer(this, context);

// Load in memory failed rows for this table
var failedRows = new List<SyncRow>();
@@ -321,19 +321,19 @@ internal virtual async Task<Exception> InternalApplyTableChangesAsync(ScopeInfo

TableChangesApplied tableChangesApplied = null;

var localSerializer = new LocalJsonSerializer();

// If someone has an interceptor on deserializing, we read the row and intercept
var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
if (interceptorsReading.Count > 0)
{
localSerializer.OnReadingRow(async (schemaTable, rowString) =>
{
var args = new DeserializingRowArgs(context, schemaTable, rowString);
await this.InterceptAsync(args, progress, cancellationToken).ConfigureAwait(false);
return args.Result;
});
}
var localSerializer = new LocalJsonSerializer(this, context);

//// If someone has an interceptor on deserializing, we read the row and intercept
//var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
//if (interceptorsReading.Count > 0)
//{
// localSerializer.OnReadingRow(async (schemaTable, rowString) =>
// {
// var args = new DeserializingRowArgs(context, schemaTable, rowString);
// await this.InterceptAsync(args, progress, cancellationToken).ConfigureAwait(false);
// return args.Result;
// });
//}

// Failure exception if any
Exception failureException = null;
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ public partial class LocalOrchestrator : BaseOrchestrator
if (!table.HasRows)
continue;

var localSerializer = new LocalJsonSerializer();
var localSerializer = new LocalJsonSerializer(this, context);

var (filePath, fileName) = errorsBatchInfo.GetNewBatchPartInfoPath(table, batchIndex, "json", info);
var batchPartInfo = new BatchPartInfo(fileName, table.TableName, table.SchemaName, table.Rows.Count, batchIndex);
Original file line number Diff line number Diff line change
@@ -146,7 +146,7 @@ public partial class RemoteOrchestrator : BaseOrchestrator
if (!table.HasRows)
continue;

var localSerializer = new LocalJsonSerializer();
var localSerializer = new LocalJsonSerializer(this, context);

var (filePath, fileName) = errorsBatchInfo.GetNewBatchPartInfoPath(table, batchIndex, "json", info);
var batchPartInfo = new BatchPartInfo(fileName, table.TableName, table.SchemaName, table.Rows.Count, batchIndex);
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ public abstract partial class BaseOrchestrator
/// </returns>
public virtual List<BatchInfo> LoadBatchInfos()
{
var localSerializer = new LocalJsonSerializer();
var localSerializer = new LocalJsonSerializer(this, new SyncContext(Guid.NewGuid(), SyncOptions.DefaultScopeName));

var directoryInfo = new DirectoryInfo(this.Options.BatchDirectory);

@@ -94,18 +94,17 @@ public virtual IEnumerable<SyncTable> LoadTablesFromBatchInfo(string scopeName,
var context = new SyncContext(Guid.NewGuid(), scopeName);
var bpiGroupedTables = batchInfo.BatchPartsInfo.GroupBy(st => st.TableName + st.SchemaName);

var localSerializer = new LocalJsonSerializer();
var localSerializer = new LocalJsonSerializer(this, context);

var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
if (interceptorsReading.Count > 0)
{
localSerializer.OnReadingRow(async (schemaTable, rowString) =>
{
var args = new DeserializingRowArgs(context, schemaTable, rowString);
await this.InterceptAsync(args).ConfigureAwait(false);
return args.Result;
});
}
//if (this.HasInterceptors<DeserializingRowArgs>())
//{
// localSerializer.OnReadingRow(async (schemaTable, rowString) =>
// {
// var args = new DeserializingRowArgs(context, schemaTable, rowString);
// await this.InterceptAsync(args).ConfigureAwait(false);
// return args.Result;
// });
//}

SyncTable currentTable = null;

@@ -195,18 +194,18 @@ public virtual SyncTable LoadTableFromBatchInfo(string scopeName, BatchInfo batc

internal SyncTable InternalLoadTableFromBatchInfo(SyncContext context, BatchInfo batchInfo, string tableName, string schemaName = default, SyncRowState? syncRowState = default)
{
var localSerializer = new LocalJsonSerializer();

var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
if (interceptorsReading.Count > 0)
{
localSerializer.OnReadingRow(async (schemaTable, rowString) =>
{
var args = new DeserializingRowArgs(context, schemaTable, rowString);
await this.InterceptAsync(args).ConfigureAwait(false);
return args.Result;
});
}
var localSerializer = new LocalJsonSerializer(this, context);

//var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
//if (interceptorsReading.Count > 0)
//{
// localSerializer.OnReadingRow(async (schemaTable, rowString) =>
// {
// var args = new DeserializingRowArgs(context, schemaTable, rowString);
// await this.InterceptAsync(args).ConfigureAwait(false);
// return args.Result;
// });
//}
SyncTable syncTable = null;

// Gets all BPI containing this table
@@ -264,18 +263,18 @@ internal SyncTable InternalLoadTableFromBatchPartInfo(SyncContext context, strin
if (!File.Exists(fullPath))
return null;

var localSerializer = new LocalJsonSerializer();
var localSerializer = new LocalJsonSerializer(this, context);

var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
if (interceptorsReading.Count > 0)
{
localSerializer.OnReadingRow(async (schemaTable, rowString) =>
{
var args = new DeserializingRowArgs(context, schemaTable, rowString);
await this.InterceptAsync(args).ConfigureAwait(false);
return args.Result;
});
}
//var interceptorsReading = this.interceptors.GetInterceptors<DeserializingRowArgs>();
//if (interceptorsReading.Count > 0)
//{
// localSerializer.OnReadingRow(async (schemaTable, rowString) =>
// {
// var args = new DeserializingRowArgs(context, schemaTable, rowString);
// await this.InterceptAsync(args).ConfigureAwait(false);
// return args.Result;
// });
//}

// Get table from file
var (syncTable, rowsCount) = localSerializer.GetSchemaTableFromFile(fullPath);
@@ -317,7 +316,7 @@ public virtual Task SaveTableToBatchPartInfoAsync(string scopeName, BatchInfo ba

internal async Task InternalSaveTableToBatchPartInfoAsync(SyncContext context, BatchInfo batchInfo, BatchPartInfo batchPartInfo, SyncTable syncTable)
{
var localSerializer = new LocalJsonSerializer();
var localSerializer = new LocalJsonSerializer(this, context);

// Get full path of my batchpartinfo
var fullPath = batchInfo.GetBatchPartInfoPath(batchPartInfo).FullPath;
@@ -327,16 +326,16 @@ internal async Task InternalSaveTableToBatchPartInfoAsync(SyncContext context, B

if (syncTable?.Rows != null && syncTable.Rows.Count <= 0)
{
var interceptorsWriting = this.interceptors.GetInterceptors<SerializingRowArgs>();
if (interceptorsWriting.Count > 0)
{
localSerializer.OnWritingRow(async (schemaTable, rowArray) =>
{
var args = new SerializingRowArgs(context, schemaTable, rowArray);
await this.InterceptAsync(args).ConfigureAwait(false);
return args.Result;
});
}
//var interceptorsWriting = this.interceptors.GetInterceptors<SerializingRowArgs>();
//if (interceptorsWriting.Count > 0)
//{
// localSerializer.OnWritingRow(async (schemaTable, rowArray) =>
// {
// var args = new SerializingRowArgs(context, schemaTable, rowArray);
// await this.InterceptAsync(args).ConfigureAwait(false);
// return args.Result;
// });
//}
// open the file and write table header
await localSerializer.OpenFileAsync(fullPath, syncTable).ConfigureAwait(false);

Original file line number Diff line number Diff line change
@@ -192,22 +192,22 @@ await schemaTables.ForEachAsync(async syncTable =>
// numbers of batch files generated
var batchIndex = -1;

var localSerializerModified = new LocalJsonSerializer();
var localSerializerDeleted = new LocalJsonSerializer();

var interceptorsWriting = this.interceptors.GetInterceptors<SerializingRowArgs>();
if (interceptorsWriting.Count > 0)
{
localSerializerModified.OnWritingRow(async (syncTable, rowArray) =>
{
var copyArray = new object[rowArray.Length];
Array.Copy(rowArray, copyArray, rowArray.Length);

var args = new SerializingRowArgs(context, syncTable, copyArray);
await this.InterceptAsync(args, progress, cancellationToken).ConfigureAwait(false);
return args.Result;
});
}
var localSerializerModified = new LocalJsonSerializer(this, context);
var localSerializerDeleted = new LocalJsonSerializer(this, context);

//var interceptorsWriting = this.interceptors.GetInterceptors<SerializingRowArgs>();
//if (interceptorsWriting.Count > 0)
//{
// localSerializerModified.OnWritingRow(async (syncTable, rowArray) =>
// {
// var copyArray = new object[rowArray.Length];
// Array.Copy(rowArray, copyArray, rowArray.Length);

// var args = new SerializingRowArgs(context, syncTable, copyArray);
// await this.InterceptAsync(args, progress, cancellationToken).ConfigureAwait(false);
// return args.Result;
// });
//}

string batchPartInfoFullPathModified = null, batchPartFileNameModified = null;
string batchPartInfoFullPathDeleted = null, batchPartFileNameDeleted = null;
28 changes: 26 additions & 2 deletions Projects/Dotmim.Sync.Core/Serialization/LocalJsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -16,12 +16,36 @@ namespace Dotmim.Sync.Serialization
public class LocalJsonSerializer
{


private StreamWriter sw;
private JsonTextWriter writer;
private Func<SyncTable, object[], Task<string>> writingRowAsync;
private Func<SyncTable, string, Task<object[]>> readingRowAsync;


public LocalJsonSerializer()
{

}

public LocalJsonSerializer(BaseOrchestrator orchestrator, SyncContext context)
{
if (orchestrator.HasInterceptors<DeserializingRowArgs>())
this.OnReadingRow(async (schemaTable, rowString) =>
{
var args = new DeserializingRowArgs(context, schemaTable, rowString);
await orchestrator.InterceptAsync(args).ConfigureAwait(false);
return args.Result;
});

if (orchestrator.HasInterceptors<SerializingRowArgs>())
this.OnWritingRow(async (schemaTable, rowArray) =>
{
var args = new SerializingRowArgs(context, schemaTable, rowArray);
await orchestrator.InterceptAsync(args).ConfigureAwait(false);
return args.Result;
});
}

/// <summary>
/// Returns if the file is opened
/// </summary>
@@ -286,7 +310,7 @@ public IEnumerable<SyncRow> GetRowsFromFile(string path, SyncTable schemaTable)
if (this.readingRowAsync != null)
{
var jArray = serializer.Deserialize<JArray>(reader);
var value = jArray.ToString();
var value = jArray[0].Value<string>();
array = this.readingRowAsync(schemaTable, value).GetAwaiter().GetResult();
}
else
Loading

0 comments on commit f68290c

Please sign in to comment.