Skip to content

Commit

Permalink
Update MDATP Driver for delegated auth (#784)
Browse files Browse the repository at this point in the history
* Add driver and docs

* Add M365DGraph data environment to compatible driver list

* Remove warnings import

* Fix numpy 2.0 NaN now being nan test failure

* Default to delegated auth when username is present in config or cs

---------

Co-authored-by: Ryan Cobb <[email protected]>
Co-authored-by: Ian Hellen <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2024
1 parent f197a68 commit 337632d
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 57 deletions.
38 changes: 33 additions & 5 deletions docs/source/data_acquisition/DataProv-MSDefender.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,37 @@ M365 Defender Configuration
Creating a Client App for M365 Defender
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Details on registering an Azure AD application for MS 365 Defender can be found
`here <https://docs.microsoft.com/windows/security/threat-protection/microsoft-defender-atp/exposed-apis-create-app-webapp>`__.
Microsoft 365 Defender APIs can be accessed in both `application <https://learn.microsoft.com/en-us/defender-endpoint/api/exposed-apis-create-app-webapp>`
and `delegated user contexts <https://learn.microsoft.com/en-us/defender-endpoint/api/exposed-apis-create-app-nativeapp>`.
Accessing Microsoft 365 Defender APIs as an application requires
either a client secret or certificate, while delegated user auth requires
an interactive signin through a browser or via device code.

As such, the details on registering an Azure AD application for MS 365 Defender
are different for application and delegated user auth scenarios. Please
see the above links for more information. Notably, delegated user auth
scenarios do not require a application credential and thus is preferrable.

For delegated user auth scenarios, ensure that the application has a
"Mobile or Desktop Application" redirect URI configured as `http://localhost`.
A redirect URI is not required for applications with their own credentials.

API permissions for the client application will require tenant admin consent.
Ensure that the consented permissions are correct for the chosen data environment
and auth scenario (application or delegated user):

+-----------------------------+------------------------+------------------+
| API Name | Permission | Data Environment |
+=============================+========================+==================+
| WindowsDefenderATP | AdvancedQuery.Read | MDE, MDATP |
+-----------------------------+------------------------+------------------+
| Microsoft Threat Protection | AdvancedHunting.Read | M365D |
+-----------------------------+------------------------+------------------+
| Microsoft Graph | ThreatHunting.Read.All | M365DGraph |
+-----------------------------+------------------------+------------------+

Once you have registered the application, you can use it to connect to
the MS Defender API.
the MS Defender API using the chosen data environment.

M365 Defender Configuration in MSTICPy
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -39,13 +66,13 @@ The settings in the file should look like the following:
MicrosoftDefender:
Args:
ClientId: "CLIENT ID"
ClientSecret: "CLIENT SECRET"
TenantId: "TENANT ID"
UserName: "User Name"
Cloud: "global"
We strongly recommend storing the client secret value
If connecting to the MS Defender 365 API using application auth,
we strongly recommend storing the client secret value
in Azure Key Vault. You can replace the text value with a referenced
to a Key Vault secret using the MSTICPy configuration editor.
See :doc:`msticpy Settings Editor <../getting_started/SettingsEditor>`)
Expand Down Expand Up @@ -166,6 +193,7 @@ the required parameters are:
* client_secret -- The secret used for by the application.
* username -- If using delegated auth for your application.

The client_secret and username parameters are mutually exclusive.

.. code:: ipython3
Expand Down
1 change: 1 addition & 0 deletions msticpy/data/core/data_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"mde": ["m365d"],
"mssentinel_new": ["mssentinel", "m365d"],
"kusto_new": ["kusto"],
"m365dgraph": ["mde", "m365d"],
}

logger: logging.Logger = logging.getLogger(__name__)
Expand Down
145 changes: 97 additions & 48 deletions msticpy/data/drivers/mdatp_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
# license information.
# --------------------------------------------------------------------------
"""MDATP OData Driver class."""
from typing import Any, Optional, Union
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

import pandas as pd

Expand All @@ -24,6 +25,33 @@
__author__ = "Pete Bryan"


@dataclass
class M365DConfiguration:
"""A container for M365D API settings.
This is based on the data environment of the query provider.
"""

login_uri: str
resource_uri: str
api_version: str
api_endpoint: str
api_uri: str
scopes: List[str]
oauth_v2: bool = field(init=False)

def __post_init__(self):
"""Determine if the selected API supports Entra ID OAuth v2.0.
This is important because the fields in the request body
are different between the two versions.
"""
if "/oauth2/v2.0" in self.login_uri:
self.oauth_v2 = True
else:
self.oauth_v2 = False


@export
class MDATPDriver(OData):
"""KqlDriver class to retrieve date from MS Defender APIs."""
Expand All @@ -46,6 +74,7 @@ def __init__(
"""
super().__init__(**kwargs)

cs_dict = _get_driver_settings(
self.CONFIG_NAME, self._ALT_CONFIG_NAMES, instance
)
Expand All @@ -54,40 +83,37 @@ def __init__(
if "cloud" in kwargs and kwargs["cloud"]:
self.cloud = kwargs["cloud"]

api_uri, oauth_uri, api_suffix = _select_api_uris(
self.data_environment, self.cloud
)
m365d_params = _select_api(self.data_environment, self.cloud)
self._m365d_params: M365DConfiguration = m365d_params
self.oauth_url = m365d_params.login_uri
self.api_root = m365d_params.resource_uri
self.api_ver = m365d_params.api_version
self.api_suffix = m365d_params.api_endpoint
self.scopes = m365d_params.scopes

self.add_query_filter(
"data_environments", ("MDE", "M365D", "MDATP", "GraphHunting")
"data_environments", ("MDE", "M365D", "MDATP", "M365DGraph", "GraphHunting")
)

self.req_body = {
"client_id": None,
"client_secret": None,
"grant_type": "client_credentials",
"resource": api_uri,
}
self.oauth_url = oauth_uri
self.api_root = api_uri
self.api_ver = "api"
self.api_suffix = api_suffix
if self.data_environment == DataEnvironment.M365D:
self.scopes = [f"{api_uri}/AdvancedHunting.Read"]
elif self.data_environment == DataEnvironment.M365DGraph:
self.api_ver = kwargs.get("api_ver", "v1.0")
self.req_body = {
"client_id": None,
"client_secret": None,
"grant_type": "client_credentials",
"scope": f"{self.api_root}.default",
}
self.scopes = [f"{api_uri}/ThreatHunting.Read.All"]
self.req_body: Dict[str, Any] = {}
if "username" in cs_dict:
delegated_auth = True

else:
self.scopes = [f"{api_uri}/AdvancedQuery.Read"]
delegated_auth = False
self.req_body["grant_type"] = "client_credentials"

if not m365d_params.oauth_v2:
self.req_body["resource"] = self.scopes

if connection_str:
self.current_connection = connection_str
self.connect(connection_str)
self.connect(
connection_str,
delegated_auth=delegated_auth,
auth_type=kwargs.get("auth_type", "interactive"),
location=cs_dict.get("location", "token_cache.bin"),
)

def query(
self, query: str, query_source: Optional[QuerySource] = None, **kwargs
Expand Down Expand Up @@ -135,26 +161,49 @@ def query(
return response


def _select_api_uris(data_environment, cloud):
"""Return API and login URIs for selected provider type."""
login_uri = get_m365d_login_endpoint(cloud)
if data_environment == DataEnvironment.M365D:
return (
get_m365d_endpoint(cloud),
f"{login_uri}{{tenantId}}/oauth2/token",
"/advancedhunting/run",
)
def _select_api(data_environment, cloud) -> M365DConfiguration:
# pylint: disable=line-too-long
"""Return API and login URIs for selected provider type.
Note that the Microsoft Graph is the preferred API.
| API Name | Resource ID | Scopes Requested | API URI (global cloud) | API Endpoint | Login URI | MSTICpy Data Environment |
| -------- | ----------- | ---------------- | ---------------------- | ------------ | --------- | ------------------------ |
| WindowsDefenderATP | fc780465-2017-40d4-a0c5-307022471b92 | `AdvancedQuery.Read` | `https://api.securitycenter.microsoft.com` | `/advancedqueries/run` | `https://login.microsoftonline.com/<tenantId>/oauth2/token` | `MDE`, `MDATP` |
| Microsoft Threat Protection | 8ee8fdad-f234-4243-8f3b-15c294843740 | `AdvancedHunting.Read` | `https://api.security.microsoft.com` | `/advancedhunting/run` | `https://login.microsoftonline.com/<tenantId>/oauth2/token` | `M365D` |
| Microsoft Graph | 00000003-0000-0000-c000-000000000000 | `ThreatHunting.Read.All` | `https://graph.microsoft.com/<version>/` | `/security/runHuntingQuery` | `https://login.microsoftonline.com/<tenantId>/oauth2/v2.0/token` | `M365DGraph` |
"""
# pylint: enable=line-too-long
if data_environment == DataEnvironment.M365DGraph:
az_cloud_config = AzureCloudConfig(cloud=cloud)
api_uri = az_cloud_config.endpoints.get("microsoftGraphResourceId")
graph_login = az_cloud_config.authority_uri
return (
api_uri,
f"{graph_login}{{tenantId}}/oauth2/v2.0/token",
"/security/runHuntingQuery",
)
return (
get_defender_endpoint(cloud),
f"{login_uri}{{tenantId}}/oauth2/token",
"/advancedqueries/run",
login_uri = f"{az_cloud_config.authority_uri}{{tenantId}}/oauth2/v2.0/token"
resource_uri = az_cloud_config.endpoints["microsoftGraphResourceId"]
api_version = "v1.0"
api_endpoint = "/security/runHuntingQuery"
scopes = [f"{resource_uri}ThreatHunting.Read.All"]

elif data_environment == DataEnvironment.M365D:
login_uri = f"{get_m365d_login_endpoint(cloud)}{{tenantId}}/oauth2/token"
resource_uri = get_m365d_endpoint(cloud)
api_version = "api"
api_endpoint = "/advancedhunting/run"
scopes = [f"{resource_uri}AdvancedHunting.Read"]

else:
login_uri = f"{get_m365d_login_endpoint(cloud)}{{tenantId}}/oauth2/token"
resource_uri = get_defender_endpoint(cloud)
api_version = "api"
api_endpoint = "/advancedqueries/run"
scopes = [f"{resource_uri}AdvancedQuery.Read"]

api_uri = f"{resource_uri}{api_version}{api_endpoint}"

return M365DConfiguration(
login_uri=login_uri,
resource_uri=resource_uri,
api_version=api_version,
api_endpoint=api_endpoint,
api_uri=api_uri,
scopes=scopes,
)
11 changes: 7 additions & 4 deletions msticpy/data/drivers/odata_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,14 @@ def connect(
help_uri=("Connecting to OData sources.", _HELP_URI),
)

# Default to using application based authentication
if not delegated_auth:
json_response = self._get_token_standard_auth(kwargs, cs_dict)
else:
# Default to using delegated auth if username is present
if "username" in cs_dict:
delegated_auth = True

if delegated_auth:
json_response = self._get_token_delegate_auth(kwargs, cs_dict)
else:
json_response = self._get_token_standard_auth(kwargs, cs_dict)

self.req_headers["Authorization"] = f"Bearer {self.aad_token}"
self.api_root = cs_dict.get("apiRoot", self.api_root)
Expand Down

0 comments on commit 337632d

Please sign in to comment.