Skip to content

Commit

Permalink
Changes to follow single out/cache arch.
Browse files Browse the repository at this point in the history
Added cache->out in seperate thread.
  • Loading branch information
alxnik committed Dec 31, 2012
1 parent 2458822 commit 98df5a2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 82 deletions.
137 changes: 57 additions & 80 deletions fpd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ CLog Log;
bool debugFlag = false;
bool SigTermFlag = false;

void* RestoreThread(void *ptr);
pthread_t RestoringThread;
pthread_t DbConnectThread;

Expand All @@ -44,8 +45,9 @@ void handler(int value);

struct RestoreThreadStruct
{
CMysqlDb *MysqlDb;
CXMLDb *XMLDb;
COutput *from;
COutput *to;

};

void handler(int value)
Expand Down Expand Up @@ -257,10 +259,11 @@ int main(int argc, char *argv[])
continue;
}

Probe->outputs.insert((*CurInput).outputs.begin(), (*CurInput).outputs.end());
Probe->output = (*CurInput).output;
Probe->cache = (*CurInput).cache;

Log.log("main %s - second %s\n", (*CurInput).output.c_str(), (*CurInput).cache.c_str());
Inputs.push_back(Probe);

}

if(Inputs.empty())
Expand All @@ -286,6 +289,8 @@ int main(int argc, char *argv[])
// inited and started the parsers
// inited the output modules.
// Now we have to handle the message flow from input to output in standard intervals as defined in the settings
Log.log("Solarspy Daemon initialized. Initializing operations...\n");
RestoringFinished = true;
while(true)
{
struct tm *ptr;
Expand All @@ -305,26 +310,38 @@ int main(int argc, char *argv[])
DataContainer Results;
Results["TimeStamp"] = int2string(TimeStamp);

// By default, start with non-responsive status and work the way from there
Results["status"] = "5";
Results["inverter"] = int2string(*CurSensor);

if((*CurProbe)->GetAverage(Results) == false)
{
Log.warn("Non responsive Inverter %s\n", Results["inverter"].c_str());
Results["status"] = "5";
}


// Start running through the list of outputs until the insert suceeds
for(int i=1; ; i++)
string CurrOutput = (*CurProbe)->output;
if(Outputs[CurrOutput]->Insert(Results) != true)
{
Log.warn("Unable to output data to %s\n", CurrOutput.c_str());
CurrOutput = (*CurProbe)->cache;

if((*CurProbe)->outputs[i].compare("") == 0)
if(Outputs[CurrOutput]->Insert(Results) != true)
Log.warn("Unable to output data to cache %s\n", CurrOutput.c_str());
}
else if(RestoringFinished == true)
{
if(RestoringThread)
{
Log.error("Could not output data\n");
break;
pthread_join(RestoringThread, NULL);
RestoringThread = 0;
}
string CurrOutput = (*CurProbe)->outputs[i];
if(Outputs[CurrOutput]->Insert(Results) == true)
break;
RestoreThreadStruct ThreadStruct;
CurrOutput = (*CurProbe)->cache;
ThreadStruct.from = Outputs[CurrOutput];
CurrOutput = (*CurProbe)->output;
ThreadStruct.to = Outputs[CurrOutput];
pthread_create( &RestoringThread, NULL, &RestoreThread, &ThreadStruct );
}
}
}
Expand All @@ -339,7 +356,7 @@ int main(int argc, char *argv[])
} while(ptr->tm_sec == 0);
}
usleep(30000);
//sleep(1);

if(SigTermFlag == true)
break;
}
Expand All @@ -354,75 +371,35 @@ int main(int argc, char *argv[])
return 0;
}


string int2string(uint8_t num)
{
stringstream strstm;
unsigned int NumCast = num;
strstm << NumCast;
return strstm.str();
}
string int2string(uint16_t num)
{
stringstream strstm;
unsigned int NumCast = num;
strstm << NumCast;
return strstm.str();
}
string int2string(uint32_t num)
{
stringstream strstm;
unsigned int NumCast = num;
strstm << NumCast;
return strstm.str();
}
string int2string(uint64_t num)
{
stringstream strstm;
strstm << num;
return strstm.str();
}
string int2string(int8_t num)
{
stringstream strstm;
int NumCast = num;
strstm << NumCast;
return strstm.str();
}
string int2string(int16_t num)
{
stringstream strstm;
int NumCast = num;
strstm << NumCast;
return strstm.str();
}
string int2string(int32_t num)
void* RestoreThread(void *ptr)
{
stringstream strstm;
strstm << num;
return strstm.str();
}
string int2string(int64_t num)
{
stringstream strstm;
strstm << num;
return strstm.str();
}
RestoringFinished = false;
RestoreThreadStruct *ThreadStruct = (RestoreThreadStruct *) ptr;

#if __WORDSIZE != 64
string int2string(time_t num)
{
stringstream strstm;
strstm << num;
return strstm.str();
}
#endif
while(true)
{
DataContainerList Data2Restore;

string float2string(float num)
{
stringstream strstm;
strstm << num;
return strstm.str();
if(ThreadStruct->from->Restore(Data2Restore, 50) == false)
{
Log.error("Error Restoring from output\n");
break;
}

if(Data2Restore.size() == 0)
break;

if(ThreadStruct->to->Insert(Data2Restore) == false)
{
Log.error("Error Inserting data to database\n");
break;
}
}

RestoringFinished = true;
pthread_exit(NULL);

return NULL;
}

string stripPathFromArgv(char *argv)
Expand Down
5 changes: 3 additions & 2 deletions fpd.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


#define MAJOR_VERSION 2
#define MINOR_VERSION 6
#define MINOR_VERSION 7

#include <stdio.h>
#include <sys/types.h>
Expand Down Expand Up @@ -60,7 +60,8 @@ class CProbe
virtual list<int> GetConnectedInverters(void) = 0;
virtual int ResetStack(void) = 0;

map<int, string> outputs;
string output;
string cache;
};

// Generic class of the hardware interfaces
Expand Down

0 comments on commit 98df5a2

Please sign in to comment.