Skip to content

Commit

Permalink
[C++]: added sending auth connect request and credential supplier for…
Browse files Browse the repository at this point in the history
… archive authentication.
  • Loading branch information
tmontgomery committed Nov 25, 2019
1 parent 4740e52 commit 64c1c9f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 6 deletions.
10 changes: 9 additions & 1 deletion aeron-archive/src/main/cpp/client/AeronArchive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,20 @@ std::shared_ptr<AeronArchive> AeronArchive::AsyncConnect::poll()

if (2 == m_step)
{
auto encodedCredentials = m_ctx->credentialsSupplier().m_encodedCredentials();

if (!m_archiveProxy->tryConnect(
m_ctx->controlResponseChannel(), m_ctx->controlResponseStreamId(), m_correlationId))
m_ctx->controlResponseChannel(),
m_ctx->controlResponseStreamId(),
encodedCredentials,
m_correlationId))
{
m_ctx->credentialsSupplier().m_onFree(encodedCredentials);
return std::shared_ptr<AeronArchive>();
}

m_ctx->credentialsSupplier().m_onFree(encodedCredentials);

m_step = 3;
}

Expand Down
84 changes: 84 additions & 0 deletions aeron-archive/src/main/cpp/client/ArchiveConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,66 @@ constexpr const std::int64_t NULL_POSITION = aeron::NULL_VALUE;
*/
constexpr const std::int64_t NULL_LENGTH = aeron::NULL_VALUE;

/**
* Callback to return encoded credentials.
*
* @return encoded credentials to send on connect request.
*/
typedef std::function<std::pair<const char *, std::uint32_t>()> credentials_encoded_credentials_supplier_t;

inline std::pair<const char *, std::uint32_t> defaultCredentialsEncodedCredentials()
{
return {nullptr, 0};
}

/**
* Callback to return encoded credentials given specific encoded challenge
*
* @param encodedChallenge to use to generate the encoded credentials.
* @return encoded credentials to send in challenge response.
*/
typedef std::function<std::pair<const char *, std::uint32_t>(
std::pair<const char *, std::uint32_t> encodedChallenge)> credentials_challenge_supplier_t;

inline std::pair<const char *, std::uint32_t> defaultCredentialsOnChallenge(
std::pair<const char *, std::uint32_t> encodedChallenge)
{
return {nullptr, 0};
}

/**
* Callback to return encoded credentials so they may be reused or free'd.
*
* @param encodedCredentials to re-use or free.
*/
typedef std::function<void(std::pair<const char *, std::uint32_t> encodedCredentials)> credentials_free_t;

inline void defaultCredentialsOnFree(std::pair<const char *, std::uint32_t> credentials)
{
delete [] credentials.first;
}

/**
* Structure to hold credential callbacks.
*/
typedef struct CredentialsSupplierDefn
{
credentials_encoded_credentials_supplier_t m_encodedCredentials = defaultCredentialsEncodedCredentials;
credentials_challenge_supplier_t m_onChallenge = defaultCredentialsOnChallenge;
credentials_free_t m_onFree = defaultCredentialsOnFree;

explicit CredentialsSupplierDefn(
credentials_encoded_credentials_supplier_t encodedCredentials = defaultCredentialsEncodedCredentials,
credentials_challenge_supplier_t onChallenge = defaultCredentialsOnChallenge,
credentials_free_t onFree = defaultCredentialsOnFree) :
m_encodedCredentials(std::move(encodedCredentials)),
m_onChallenge(std::move(onChallenge)),
m_onFree(std::move(onFree))
{
}
}
CredentialsSupplier;

namespace Configuration
{
constexpr const std::uint8_t ARCHIVE_MAJOR_VERSION = 1;
Expand Down Expand Up @@ -440,6 +500,28 @@ class Context
return *this;
}

/**
* Get the credential supplier that will be called for generating encoded credentials.
*
* @return the credential supplier that will be called for generating encoded credentials.
*/
inline CredentialsSupplier& credentialsSupplier()
{
return m_credentialsSupplier;
}

/**
* Set the CredentialSupplier functions to be called as connect requests are handled.
*
* @param supplier that holds functions to be called.
* @return this for a fluent API.
*/
inline this_t& credentialsSupplier(const CredentialsSupplier& supplier)
{
m_credentialsSupplier = supplier;
return *this;
}

private:
std::shared_ptr<Aeron> m_aeron;
std::string m_aeronDirectoryName = aeron::Context::defaultAeronPath();
Expand All @@ -461,6 +543,8 @@ class Context
bool m_ownsAeronClient = false;

exception_handler_t m_errorHandler = nullptr;

CredentialsSupplier m_credentialsSupplier;
};

}}}
Expand Down
8 changes: 5 additions & 3 deletions aeron-archive/src/main/cpp/client/ArchiveProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "ArchiveProxy.h"
#include "concurrent/YieldingIdleStrategy.h"
#include "aeron_archive_client/BoundedReplayRequest.h"
#include "aeron_archive_client/ConnectRequest.h"
#include "aeron_archive_client/AuthConnectRequest.h"
#include "aeron_archive_client/CloseSessionRequest.h"
#include "aeron_archive_client/StartRecordingRequest.h"
#include "aeron_archive_client/ExtendRecordingRequest.h"
Expand Down Expand Up @@ -66,15 +66,17 @@ util::index_t ArchiveProxy::connectRequest(
AtomicBuffer& buffer,
const std::string& responseChannel,
std::int32_t responseStreamId,
std::pair<const char *, std::uint32_t> encodedCredentials,
std::int64_t correlationId)
{
ConnectRequest request;
AuthConnectRequest request;

wrapAndApplyHeader(request, buffer)
.correlationId(correlationId)
.responseStreamId(responseStreamId)
.version(Configuration::ARCHIVE_SEMANTIC_VERSION)
.putResponseChannel(responseChannel);
.putResponseChannel(responseChannel)
.putEncodedCredentials(encodedCredentials.first, encodedCredentials.second);

return messageAndHeaderLength(request);
}
Expand Down
16 changes: 14 additions & 2 deletions aeron-archive/src/main/cpp/client/ArchiveProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define AERON_ARCHIVE_ARCHIVE_PROXY_H

#include <array>
#include <utility>

#include "Aeron.h"
#include "concurrent/BackOffIdleStrategy.h"
Expand Down Expand Up @@ -58,12 +59,22 @@ class ArchiveProxy
*
* @param responseChannel for the control message responses.
* @param responseStreamId for the control message responses.
* @param encodedCredentials for the connect request.
* @param correlationId for this request.
* @return true if successfully offered otherwise false.
*/
bool tryConnect(const std::string& responseChannel, std::int32_t responseStreamId, std::int64_t correlationId)
bool tryConnect(
const std::string& responseChannel,
std::int32_t responseStreamId,
std::pair<const char *, std::uint32_t> encodedCredentials,
std::int64_t correlationId)
{
const util::index_t length = connectRequest(m_buffer, responseChannel, responseStreamId, correlationId);
const util::index_t length = connectRequest(
m_buffer,
responseChannel,
responseStreamId,
encodedCredentials,
correlationId);

return m_publication->offer(m_buffer, 0, length) > 0;
}
Expand Down Expand Up @@ -730,6 +741,7 @@ class ArchiveProxy
AtomicBuffer& buffer,
const std::string& responseChannel,
std::int32_t responseStreamId,
std::pair<const char *, std::uint32_t> encodedCredentials,
std::int64_t correlationId);

static util::index_t closeSession(AtomicBuffer& buffer, std::int64_t controlSessionId);
Expand Down

0 comments on commit 64c1c9f

Please sign in to comment.