@@ -164,13 +164,13 @@ Status Pipeline::execute(ExecutionContext context) {
164
164
DeferredNodeSessions tmpDeferredNodeSessions;
165
165
for (auto & nextNode : nextNodesFromFinished) {
166
166
auto readySessions = nextNode.get ().getReadySessions ();
167
- for (auto & sessionKey : readySessions) {
168
- SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Started execution of pipeline: {} node: {} session: {}" , getName (), nextNode.get ().getName (), sessionKey );
169
- startedSessions.emplace (nextNode.get ().getName () + sessionKey );
170
- status = nextNode.get ().execute (sessionKey , finishedNodeQueue);
167
+ for (auto & readySessionKey : readySessions) {
168
+ SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Started execution of pipeline: {} node: {} session: {}" , getName (), nextNode.get ().getName (), readySessionKey );
169
+ startedSessions.emplace (nextNode.get ().getName () + readySessionKey );
170
+ status = nextNode.get ().execute (readySessionKey , finishedNodeQueue);
171
171
if (status == StatusCode::PIPELINE_STREAM_ID_NOT_READY_YET) {
172
- SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Node: {} session: {} not ready for execution yet" , nextNode.get ().getName (), sessionKey );
173
- tmpDeferredNodeSessions.emplace_back (nextNode.get (), sessionKey );
172
+ SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Node: {} session: {} not ready for execution yet" , nextNode.get ().getName (), readySessionKey );
173
+ tmpDeferredNodeSessions.emplace_back (nextNode.get (), readySessionKey );
174
174
status = StatusCode::OK;
175
175
}
176
176
CHECK_AND_LOG_ERROR (nextNode.get ())
@@ -192,18 +192,18 @@ Status Pipeline::execute(ExecutionContext context) {
192
192
if (finishedNodeQueue.size () > 0 ) {
193
193
break ;
194
194
}
195
- auto & [nodeRef, sessionKey ] = *it;
195
+ auto & [nodeRef, deferredSessionKey ] = *it;
196
196
auto & node = nodeRef.get ();
197
- SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Trying to trigger node: {} session: {} execution" , node.getName (), sessionKey );
198
- status = node.execute (sessionKey , finishedNodeQueue);
197
+ SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Trying to trigger node: {} session: {} execution" , node.getName (), deferredSessionKey );
198
+ status = node.execute (deferredSessionKey , finishedNodeQueue);
199
199
if (status.ok ()) {
200
- SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Node: {} session: {} is ready" , node.getName (), sessionKey );
200
+ SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Node: {} session: {} is ready" , node.getName (), deferredSessionKey );
201
201
it = deferredNodeSessions.erase (it);
202
202
continue ;
203
203
}
204
204
it++;
205
205
if (status == StatusCode::PIPELINE_STREAM_ID_NOT_READY_YET) {
206
- SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Node: {} session: {} not ready for execution yet" , node.getName (), sessionKey );
206
+ SPDLOG_LOGGER_DEBUG (dag_executor_logger, " Node: {} session: {} not ready for execution yet" , node.getName (), deferredSessionKey );
207
207
status = StatusCode::OK;
208
208
} else {
209
209
CHECK_AND_LOG_ERROR (node)
0 commit comments