Skip to content

Commit

Permalink
getstreamkeysummary tweaks and getstreampublishersummary
Browse files Browse the repository at this point in the history
  • Loading branch information
mike31 committed Oct 19, 2017
1 parent a1f63ac commit c6facdd
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 33 deletions.
48 changes: 48 additions & 0 deletions src/rpc/rpchelp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3825,7 +3825,54 @@ void mc_InitRPCHelpMap16()
"}\n"
));

}

void mc_InitRPCHelpMap17()
{
mapHelpStrings.insert(std::make_pair("getstreamkeysummary",
"getstreamkeysummary \"stream-identifier\" \"key\" \"mode\"\n"
"\nReturns stream json object items summary for specific key.\n"
"\nArguments:\n"
"1. \"stream-identifier\" (string, required) Stream identifier - one of the following: stream txid, stream reference, stream name.\n"
"2. \"key\" (string, required) Stream key\n"
"3. \"mode\" (string, required) Comma delimited list of the following:\n"
" jsonobjectmerge - merge json objects\n"
" ignore - if non-object items found - ignore them if specified, return error otherwise\n"
" recursive - merge json objects recursively\n"
" noupdate - if specified, first appearance of the key is taken\n"
" firstpublisherany - if specified, only items published by one of original publishers will be summarized\n"
" firstpublisherall - if specified, only items published by all original publishers will be summarized\n"
" omitnull - omit keys with null values\n"
"\nResult:\n"
"summary-object (object) Summary object for specific key.\n"
"\nExamples:\n"
+ HelpExampleCli("getstreamkeysummary", "\"test-stream\" \"key01\" \"jsonobjectmerge\"")
+ HelpExampleCli("getstreamkeysummary", "\"test-stream\" \"key01\" \"jsonobjectmerge,ignore,recursive\"")
+ HelpExampleRpc("getstreamkeysummary", "\"test-stream\", \"key01\", \"jsonobjectmerge,ignore,recursive\"")
));


mapHelpStrings.insert(std::make_pair("getstreampublishersummary",
"getstreampublishersummary \"stream-identifier\" \"address\" \"mode\"\n"
"\nReturns stream json object items summary for specific publisher.\n"
"\nArguments:\n"
"1. \"stream-identifier\" (string, required) Stream identifier - one of the following: stream txid, stream reference, stream name.\n"
"2. \"address\" (string, required) Publisher address\n"
"3. \"mode\" (string, required) Comma delimited list of the following:\n"
" jsonobjectmerge - merge json objects\n"
" ignore - if non-object items found - ignore them if specified, return error otherwise\n"
" recursive - merge json objects recursively\n"
" noupdate - if specified, first appearance of the key is taken\n"
" omitnull - omit keys with null values\n"
"\nResult:\n"
"summary-object (object) Summary object for specific publisher.\n"
"\nExamples:\n"
+ HelpExampleCli("liststreampublisheritems", "\"test-stream\" \"1M72Sfpbz1BPpXFHz9m3CdqATR44Jvaydd\" \"jsonobjectmerge\"")
+ HelpExampleCli("liststreampublisheritems", "\"test-stream\" \"1M72Sfpbz1BPpXFHz9m3CdqATR44Jvaydd\" \"jsonobjectmerge,ignore,recursive\"")
+ HelpExampleRpc("liststreampublisheritems", "\"test-stream\", \"1M72Sfpbz1BPpXFHz9m3CdqATR44Jvaydd\", \"jsonobjectmerge,ignore,recursive\"")
));


mapHelpStrings.insert(std::make_pair("AAAAAAA",
""
));
Expand Down Expand Up @@ -3922,6 +3969,7 @@ void mc_InitRPCHelpMap()
mc_InitRPCHelpMap14();
mc_InitRPCHelpMap15();
mc_InitRPCHelpMap16();
mc_InitRPCHelpMap17();

mc_InitRPCLogParamCountMap();
mc_InitRPCAllowedWhenWaitingForUpgradeSet();
Expand Down
146 changes: 116 additions & 30 deletions src/rpc/rpcstreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1228,11 +1228,8 @@ void getSubKeyEntityFromPublisher(string str,mc_TxEntityStat entStat,mc_TxEntity
entity->m_EntityType=entStat.m_Entity.m_EntityType | MC_TET_SUBKEY;
}

Value getstreamkeysummary(const Array& params, bool fHelp)
Value getstreamsummary(const Array& params, bool fPublisher)
{
if (fHelp || params.size() != 3)
throw runtime_error("Help message not found\n");

if(mc_gState->m_Features->Streams() == 0)
{
throw JSONRPCError(RPC_NOT_SUPPORTED, "API is not supported for this protocol version");
Expand All @@ -1251,23 +1248,38 @@ Value getstreamkeysummary(const Array& params, bool fHelp)
entStat.Zero();
memcpy(&entStat,stream_entity.GetTxID()+MC_AST_SHORT_TXID_OFFSET,MC_AST_SHORT_TXID_SIZE);
entStat.m_Entity.m_EntityType=MC_TET_STREAM_KEY;
if(fPublisher)
{
entStat.m_Entity.m_EntityType=MC_TET_STREAM_PUBLISHER;
}
entStat.m_Entity.m_EntityType |= MC_TET_CHAINPOS;
if(!pwalletTxsMain->FindEntity(&entStat))
{
throw JSONRPCError(RPC_NOT_SUBSCRIBED, "Not subscribed to this stream");
}

string key_string=params[1].get_str();
bool fFirstPublisher=false;
bool fFirstPublisherAll=false;
string key_string=params[1].get_str();
const char *key_ptr=key_string.c_str();
getSubKeyEntityFromKey(params[1].get_str(),entStat,&entity);
if(fPublisher)
{
getSubKeyEntityFromPublisher(params[1].get_str(),entStat,&entity);
}
else
{
getSubKeyEntityFromKey(params[1].get_str(),entStat,&entity);
}

set<string> setFirstPublishers;

vector<string> inputStrings;
inputStrings=ParseStringList(params[2]);
uint32_t mode=0;
for(int j=0;j<(int)inputStrings.size();j++)
{
bool found=false;
if(inputStrings[j]=="jsonupdate")
if(inputStrings[j]=="jsonobjectmerge")
{
mode |= MC_VMM_MERGE_OBJECTS;
found=true;
Expand All @@ -1282,11 +1294,32 @@ Value getstreamkeysummary(const Array& params, bool fHelp)
mode |= MC_VMM_IGNORE;
found=true;
}
if(inputStrings[j]=="first")
if(inputStrings[j]=="noupdate")
{
mode |= MC_VMM_TAKE_FIRST;
mode |= MC_VMM_TAKE_FIRST_FOR_FIELD;
found=true;
}
if(inputStrings[j]=="omitnull")
{
mode |= MC_VMM_OMIT_NULL;
found=true;
}
if(!fPublisher)
{
if(inputStrings[j]=="firstpublisherany")
{
fFirstPublisher=true;
fFirstPublisherAll=false;
found=true;
}
if(inputStrings[j]=="firstpublisherall")
{
fFirstPublisher=true;
fFirstPublisherAll=true;
found=true;
}
}
if(!found)
{
throw JSONRPCError(RPC_INVALID_PARAMETER, "Unrecognized mode: " + inputStrings[j]);
Expand All @@ -1298,7 +1331,8 @@ Value getstreamkeysummary(const Array& params, bool fHelp)
entity_rows->Initialize(MC_TDB_ENTITY_KEY_SIZE,MC_TDB_ROW_SIZE,MC_BUF_MODE_DEFAULT);

Object empty_object;
int i,n,c,m,err;
Object obj;
int i,n,c,m,err,pcount;
err=MC_ERR_NOERROR;
n=pwalletTxsMain->GetListSize(&entity,entStat.m_Generation,NULL);
i=0;
Expand All @@ -1318,11 +1352,57 @@ Value getstreamkeysummary(const Array& params, bool fHelp)
pwalletTxsMain->GetList(&entity,entStat.m_Generation,i+1,c,entity_rows);
}
mc_TxEntityRow *lpEntTx;
lpEntTx=(mc_TxEntityRow*)entity_rows->GetRow(i);
lpEntTx=(mc_TxEntityRow*)entity_rows->GetRow(i % m);
uint256 hash;
int first_output=mc_GetHashAndFirstOutput(lpEntTx,&hash);
const CWalletTx& wtx=pwalletTxsMain->GetWalletTx(hash,NULL,NULL);
Object entry=StreamItemEntry(wtx,first_output,stream_entity.GetTxID()+MC_AST_SHORT_TXID_OFFSET,false,&key_ptr,NULL,NULL);
Object entry;
if(fPublisher)
{
entry=StreamItemEntry(wtx,first_output,stream_entity.GetTxID()+MC_AST_SHORT_TXID_OFFSET,false,NULL,&key_ptr,NULL);
}
else
{
entry=StreamItemEntry(wtx,first_output,stream_entity.GetTxID()+MC_AST_SHORT_TXID_OFFSET,false,&key_ptr,NULL,NULL);
}

if(fFirstPublisher)
{
pcount=0;
BOOST_FOREACH(const Pair& a, entry)
{
if(a.name_ == "publishers")
{
Array arr=a.value_.get_array();
if(i == 0)
{
setFirstPublishers.clear();
for(unsigned int j=0;j<arr.size();j++)
{
setFirstPublishers.insert(arr[j].get_str());
pcount++;
}
}
else
{
for(unsigned int j=0;j<arr.size();j++)
{
const set<string>::const_iterator it=setFirstPublishers.find(arr[j].get_str());
if(it != setFirstPublishers.end())
{
pcount++;
}
}
}
}
}
if( ( fFirstPublisherAll && (pcount != (int)setFirstPublishers.size())) ||
(!fFirstPublisherAll && (pcount == 0)) )
{
entry.clear();
}
}

BOOST_FOREACH(const Pair& a, entry)
{
if(a.name_ == "data")
Expand Down Expand Up @@ -1356,39 +1436,45 @@ Value getstreamkeysummary(const Array& params, bool fHelp)
i++;
}

if(result.type() == obj_type)
if(mc_IsJsonObjectForMerge(&result,0))
{
if(result.get_obj().size() == 1)
{
if(result.get_obj()[0].name_ == "json")
{
if(result.get_obj()[0].value_.type() == obj_type)
{
Value json=result.get_obj()[0].value_;
Value empty_value=empty_object;
Object obj;
json=mc_MergeValues(&json,&empty_value,mode | MC_VMM_TAKE_FIRST,1,&err);
obj.push_back(Pair("json", json));
result=obj;
}
}
}
}
Value json=result.get_obj()[0].value_;
Value empty_value=empty_object;
json=mc_MergeValues(&json,&empty_value,mode | MC_VMM_TAKE_FIRST,1,&err);
obj.push_back(Pair("json", json));
}
else
{
obj.push_back(Pair("json", empty_object));
}
result=obj;

exitlbl:

delete entity_rows;

if(err)
{
throw JSONRPCError(RPC_NOT_ALLOWED, "Could not merge stream items for this key");
throw JSONRPCError(RPC_NOT_ALLOWED, "Could not merge stream items for this " + (fPublisher ? string("publisher") : string("key")));
}

return result;
}

Value getstreamkeysummary(const Array& params, bool fHelp)
{
if (fHelp || params.size() != 3)
throw runtime_error("Help message not found\n");

return getstreamsummary(params,false);
}

Value getstreampublishersummary(const Array& params, bool fHelp)
{
return Value::null;
if (fHelp || params.size() != 3)
throw runtime_error("Help message not found\n");

return getstreamsummary(params,true);
}

Value liststreamkeyitems(const Array& params, bool fHelp)
Expand Down
34 changes: 31 additions & 3 deletions src/rpc/rpcutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,25 @@ bool mc_IsJsonObjectForMerge(const Value *value,int level)
Value mc_MergeValues(const Value *value1,const Value *value2,uint32_t mode,int level,int *error)
{
int no_merge=0;

if(mode & MC_VMM_OMIT_NULL)
{
if(mode & MC_VMM_TAKE_FIRST)
{
if(value1->type() == null_type)
{
return Value::null;
}
}
else
{
if(value2->type() == null_type)
{
return Value::null;
}
}
}

bool value1_is_obj=mc_IsJsonObjectForMerge(value1,level);
bool value2_is_obj=mc_IsJsonObjectForMerge(value2,level);
if( (mode & MC_VMM_MERGE_OBJECTS) == 0)
Expand Down Expand Up @@ -3502,7 +3521,10 @@ Value mc_MergeValues(const Value *value1,const Value *value2,uint32_t mode,int l
map<string, Value>::iterator it2 = map2.find(it1->first);
if( it2 == map2.end() )
{
result.push_back(Pair(it1->first, it1->second));
if( ((mode & MC_VMM_OMIT_NULL) == 0) || (it1->second.type() != null_type) )
{
result.push_back(Pair(it1->first, it1->second));
}
}
else
{
Expand All @@ -3511,14 +3533,20 @@ Value mc_MergeValues(const Value *value1,const Value *value2,uint32_t mode,int l
{
return Value::null;
}
result.push_back(Pair(it1->first, merged));
if( ((mode & MC_VMM_OMIT_NULL) == 0) || (merged.type() != null_type) )
{
result.push_back(Pair(it1->first, merged));
}
map2.erase(it2);
}
}

for(map<string,Value>::iterator it2 = map2.begin(); it2 != map2.end(); ++it2)
{
result.push_back(Pair(it2->first, it2->second));
if( ((mode & MC_VMM_OMIT_NULL) == 0) || (it2->second.type() != null_type) )
{
result.push_back(Pair(it2->first, it2->second));
}
}

return result;
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/rpcutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ using namespace json_spirit;
#define MC_VMM_IGNORE 0x00000004
#define MC_VMM_TAKE_FIRST 0x00000008
#define MC_VMM_TAKE_FIRST_FOR_FIELD 0x00000010
#define MC_VMM_OMIT_NULL 0x00000020


// codes for allowed_objects fields
Expand Down Expand Up @@ -107,6 +108,7 @@ bool paramtobool(Value param);
int paramtoint(Value param,bool check_for_min,int min_value,string error_message);
vector<int> ParseBlockSetIdentifier(Value blockset_identifier);
vector<unsigned char> ParseRawFormattedData(const Value *value,uint32_t *data_format,mc_Script *lpDetailsScript,int *errorCode,string *strError);
bool mc_IsJsonObjectForMerge(const Value *value,int level);
Value mc_MergeValues(const Value *value1,const Value *value2,uint32_t mode,int level,int *error);


Expand Down

0 comments on commit c6facdd

Please sign in to comment.