Skip to content

Commit

Permalink
allow to use in single threaded mode
Browse files Browse the repository at this point in the history
(some client side channels and all server side channels still need to be
ported to new api)

server: build fix, do not disable threads for rfx encoder

cliprdr client channel: implemented support for DisableThreads option
looks like thread does not make sense at all for this channel

do not initialize disabled image codecs (respect settings)

channels: client: rail: added support for DisableThreads setting

changed "BOOL DisableThreads" to "UINT32 ThreadingFlags"
dropped unnecessary apu changes

draft implementation of threading settings aware message handling api
for addins/channels

rail: use new messaging api

fixed memory leak

msgs handlers external api changes (as requested)

msgs_handlers: init fix

fixed memory leak

logic fix

resolved problems appeared after rebase to master, dropped unnecessary
changes

git clang-format origin/master

fixed TestFreeRDPCodecRemoteFX.c

"formatting, run `clang-format` please"

properly use new "rfx_context_new(BOOL, UINT32)" everywhere

passed Threading Flags to "rfx_context_new" where available

in older C standarts veriables declaration must be done before any code

requested changes

clang-format as requested

use broken signatures of standert C functions for m$ s**tos

clang-format

requested changes

requested changes

moved ThreadingFlags to stable api zone

define type for channel msg handler

typo fix

clang-format

build fix

us ThreadingFlags from server settings

git clang-format origin/master

clang-format
  • Loading branch information
Gluzskiy Alexandr authored and akallabeth committed Feb 25, 2021
1 parent bf9bce2 commit bee2e15
Show file tree
Hide file tree
Showing 60 changed files with 527 additions and 467 deletions.
215 changes: 215 additions & 0 deletions channels/client/addin.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,218 @@ PVIRTUALCHANNELENTRY freerdp_channels_load_static_addin_entry(LPCSTR pszName, LP

return NULL;
}

typedef struct
{
wMessageQueue* queue;
wStream* data_in;
HANDLE thread;
char* channel_name;
rdpContext* ctx;
LPVOID userdata;
MsgHandler msg_handler;
} msg_proc_internals;

static DWORD WINAPI channel_client_thread_proc(LPVOID userdata)
{
UINT error = CHANNEL_RC_OK;
wStream* data;
wMessage message;
msg_proc_internals* internals = userdata;
if (!internals)
{
/* TODO: return some error */
}
while (1)
{
if (!MessageQueue_Wait(internals->queue))
{
WLog_ERR(TAG, "MessageQueue_Wait failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (!MessageQueue_Peek(internals->queue, &message, TRUE))
{
WLog_ERR(TAG, "MessageQueue_Peek failed!");
error = ERROR_INTERNAL_ERROR;
break;
}

if (message.id == WMQ_QUIT)
break;

if (message.id == 0)
{
data = (wStream*)message.wParam;

if ((error = internals->msg_handler(internals->userdata, data)))
{
WLog_ERR(TAG, "msg_handler failed with error %" PRIu32 "!", error);
break;
}
}
}
if (error && internals->ctx)
{
char msg[128];
_snprintf(msg, 127,
"%s_virtual_channel_client_thread reported an"
" error",
internals->channel_name);
setChannelError(internals->ctx, error, msg);
}
ExitThread(error);
return error;
}

static void free_msg(void* obj)
{
wMessage* msg = (wMessage*)obj;

if (msg)
{
wStream* s = (wStream*)msg->wParam;
Stream_Free(s, TRUE);
}
}

/* Create message queue and thread or not, depending on settings */
void* channel_client_create_handler(rdpContext* ctx, LPVOID userdata, MsgHandler msg_handler,
const char* channel_name)
{
msg_proc_internals* internals = calloc(1, sizeof(msg_proc_internals));
if (!internals)
{
WLog_ERR(TAG, "calloc failed!");
return 0;
}
internals->msg_handler = msg_handler;
internals->userdata = userdata;
internals->channel_name = strdup(channel_name);
internals->ctx = ctx;
if (!(ctx->settings->ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS))
{
wObject obj = { 0 };
obj.fnObjectFree = free_msg;
internals->queue = MessageQueue_New(&obj);
if (!internals->queue)
{
WLog_ERR(TAG, "MessageQueue_New failed!");
return 0;
}

if (!(internals->thread =
CreateThread(NULL, 0, channel_client_thread_proc, (void*)internals, 0, NULL)))
{
WLog_ERR(TAG, "CreateThread failed!");
MessageQueue_Free(internals->queue);
internals->queue = NULL;
}
}
return internals;
}
/* post a message in the queue or directly call the processing handler */
UINT channel_client_post_message(void* MsgsHandle, LPVOID pData, UINT32 dataLength,
UINT32 totalLength, UINT32 dataFlags)
{
msg_proc_internals* internals = MsgsHandle;
wStream* data_in;

if (!internals)
{
/* TODO: return some error here */
return CHANNEL_RC_OK;
}

if ((dataFlags & CHANNEL_FLAG_SUSPEND) || (dataFlags & CHANNEL_FLAG_RESUME))
{
return CHANNEL_RC_OK;
}

if (dataFlags & CHANNEL_FLAG_FIRST)
{
if (internals->data_in)
Stream_Free(internals->data_in, TRUE);

internals->data_in = Stream_New(NULL, totalLength);
}

if (!(data_in = internals->data_in))
{
WLog_ERR(TAG, "Stream_New failed!");
return CHANNEL_RC_NO_MEMORY;
}

if (!Stream_EnsureRemainingCapacity(data_in, dataLength))
{
Stream_Free(internals->data_in, TRUE);
internals->data_in = NULL;
return CHANNEL_RC_NO_MEMORY;
}

Stream_Write(data_in, pData, dataLength);

if (dataFlags & CHANNEL_FLAG_LAST)
{
if (Stream_Capacity(data_in) != Stream_GetPosition(data_in))
{
char msg[128];
_snprintf(msg, 127, "%s_plugin_process_received: read error", internals->channel_name);
WLog_ERR(TAG, msg);
return ERROR_INTERNAL_ERROR;
}

internals->data_in = NULL;
Stream_SealLength(data_in);
Stream_SetPosition(data_in, 0);

if (internals->ctx->settings->ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS)
{
UINT error = CHANNEL_RC_OK;
if ((error = internals->msg_handler(internals->userdata, data_in)))
{
WLog_ERR(TAG,
"msg_handler failed with error"
" %" PRIu32 "!",
error);
return ERROR_INTERNAL_ERROR;
}
}
}
return CHANNEL_RC_OK;
}
/* Tear down queue and thread */
UINT channel_client_quit_handler(void* MsgsHandle)
{
msg_proc_internals* internals = MsgsHandle;
UINT rc;
if (!internals)
{
/* TODO: return some error here */
return CHANNEL_RC_OK;
}
if (!(internals->ctx->settings->ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS))
{
if (MessageQueue_PostQuit(internals->queue, 0) &&
(WaitForSingleObject(internals->thread, INFINITE) == WAIT_FAILED))
{
rc = GetLastError();
WLog_ERR(TAG, "WaitForSingleObject failed with error %" PRIu32 "", rc);
return rc;
}
MessageQueue_Free(internals->queue);
CloseHandle(internals->thread);
}
if (internals->data_in)
{
Stream_Free(internals->data_in, TRUE);
internals->data_in = NULL;
}
if (internals->channel_name)
{
free(internals->channel_name);
}

free(internals);
return CHANNEL_RC_OK;
}
10 changes: 10 additions & 0 deletions channels/client/addin.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

typedef UINT (*MsgHandler)(LPVOID userdata, wStream* data);

FREERDP_API void* channel_client_create_handler(rdpContext* ctx, LPVOID userdata, MsgHandler,
const char* channel_name);

UINT channel_client_post_message(void* MsgsHandle, LPVOID pData, UINT32 dataLength,
UINT32 totalLength, UINT32 dataFlags);

UINT channel_client_quit_handler(void* MsgsHandle);
Loading

0 comments on commit bee2e15

Please sign in to comment.