Skip to content

Commit

Permalink
[RORDEV-757] Data streams rule (#872)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuszkp96 committed Jan 29, 2023
1 parent 641f254 commit d034631
Show file tree
Hide file tree
Showing 180 changed files with 5,754 additions and 2,802 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package tech.beshu.ror.accesscontrol.blocks

import tech.beshu.ror.accesscontrol.blocks.BlockContext.DataStreamRequestBlockContext.BackingIndices
import tech.beshu.ror.accesscontrol.blocks.BlockContext.MultiIndexRequestBlockContext.Indices
import tech.beshu.ror.accesscontrol.blocks.BlockContext.TemplateRequestBlockContext.TemplatesTransformation
import tech.beshu.ror.accesscontrol.blocks.BlockContextUpdater.{AliasRequestBlockContextUpdater, RepositoryRequestBlockContextUpdater, SnapshotRequestBlockContextUpdater, TemplateRequestBlockContextUpdater}
import tech.beshu.ror.accesscontrol.blocks.BlockContextUpdater._
import tech.beshu.ror.accesscontrol.blocks.metadata.UserMetadata
import tech.beshu.ror.accesscontrol.domain.FieldLevelSecurity.RequestFieldsUsage
import tech.beshu.ror.accesscontrol.domain._
Expand Down Expand Up @@ -73,6 +74,22 @@ object BlockContext {
allAllowedIndices: Set[ClusterIndexName])
extends BlockContext

final case class DataStreamRequestBlockContext(override val requestContext: RequestContext,
override val userMetadata: UserMetadata,
override val responseHeaders: Set[Header],
override val responseTransformations: List[ResponseTransformation],
dataStreams: Set[DataStreamName],
backingIndices: DataStreamRequestBlockContext.BackingIndices)
extends BlockContext
object DataStreamRequestBlockContext {
sealed trait BackingIndices
object BackingIndices {
final case class IndicesInvolved(filteredIndices: Set[ClusterIndexName],
allAllowedIndices: Set[ClusterIndexName]) extends BackingIndices
case object IndicesNotInvolved extends BackingIndices
}
}

final case class AliasRequestBlockContext(override val requestContext: RequestContext,
override val userMetadata: UserMetadata,
override val responseHeaders: Set[Header],
Expand Down Expand Up @@ -166,6 +183,13 @@ object BlockContext {
override def indices(blockContext: SnapshotRequestBlockContext): Set[ClusterIndexName] = blockContext.filteredIndices
}

implicit val indicesFromDataStreamRequestBlockContext = new HasIndices[DataStreamRequestBlockContext] {
override def indices(blockContext: DataStreamRequestBlockContext): Set[ClusterIndexName] = blockContext.backingIndices match {
case BackingIndices.IndicesInvolved(filteredIndices, allAllowedIndices) => filteredIndices
case BackingIndices.IndicesNotInvolved => Set.empty
}
}

implicit class Ops[B <: BlockContext : HasIndices](blockContext: B) {
def indices: Set[ClusterIndexName] = HasIndices[B].indices(blockContext)
}
Expand Down Expand Up @@ -278,6 +302,12 @@ object BlockContext {
}
}

implicit class DataStreamOperationBlockContextUpdateOps(val blockContext: DataStreamRequestBlockContext) extends AnyVal {
def withDataStreams(dataStreams: Set[DataStreamName]): DataStreamRequestBlockContext = {
DataStreamRequestBlockContextUpdater.withDataStreams(blockContext, dataStreams)
}
}

implicit class BlockContextWithIndicesUpdaterOps[B <: BlockContext : BlockContextWithIndicesUpdater](blockContext: B) {
def withIndices(filteredIndices: Set[ClusterIndexName], allAllowedIndices: Set[ClusterIndexName]): B = {
BlockContextWithIndicesUpdater[B].withIndices(blockContext, filteredIndices, allAllowedIndices)
Expand Down Expand Up @@ -331,6 +361,10 @@ object BlockContext {
case _: GeneralNonIndexRequestBlockContext => Set.empty
case _: RepositoryRequestBlockContext => Set.empty
case bc: SnapshotRequestBlockContext => bc.filteredIndices
case bc: DataStreamRequestBlockContext => bc.backingIndices match {
case BackingIndices.IndicesInvolved(filteredIndices, allAllowedIndices) => filteredIndices
case BackingIndices.IndicesNotInvolved => Set.empty
}
case bc: TemplateRequestBlockContext =>
bc.templateOperation match {
case TemplateOperation.GettingLegacyAndIndexTemplates(_, _) => Set.empty
Expand Down Expand Up @@ -371,6 +405,7 @@ object BlockContext {
case _: GeneralNonIndexRequestBlockContext => Set.empty
case bc: RepositoryRequestBlockContext => bc.repositories
case bc: SnapshotRequestBlockContext => bc.repositories
case _: DataStreamRequestBlockContext => Set.empty
case _: TemplateRequestBlockContext => Set.empty
case _: AliasRequestBlockContext => Set.empty
case _: GeneralIndexRequestBlockContext => Set.empty
Expand All @@ -389,6 +424,26 @@ object BlockContext {
case _: GeneralNonIndexRequestBlockContext => Set.empty
case _: RepositoryRequestBlockContext => Set.empty
case bc: SnapshotRequestBlockContext => bc.snapshots
case _: DataStreamRequestBlockContext => Set.empty
case _: TemplateRequestBlockContext => Set.empty
case _: AliasRequestBlockContext => Set.empty
case _: GeneralIndexRequestBlockContext => Set.empty
case _: MultiIndexRequestBlockContext => Set.empty
case _: FilterableRequestBlockContext => Set.empty
case _: FilterableMultiRequestBlockContext => Set.empty
case _: RorApiRequestBlockContext => Set.empty
}
}
}

implicit class DataStreamsFromBlockContext(val blockContext: BlockContext) extends AnyVal {
def dataStreams: Set[DataStreamName] = {
blockContext match {
case _: CurrentUserMetadataRequestBlockContext => Set.empty
case _: GeneralNonIndexRequestBlockContext => Set.empty
case _: RepositoryRequestBlockContext => Set.empty
case _: SnapshotRequestBlockContext => Set.empty
case bc: DataStreamRequestBlockContext => bc.dataStreams
case _: TemplateRequestBlockContext => Set.empty
case _: AliasRequestBlockContext => Set.empty
case _: GeneralIndexRequestBlockContext => Set.empty
Expand All @@ -407,6 +462,7 @@ object BlockContext {
case _: GeneralNonIndexRequestBlockContext => None
case _: RepositoryRequestBlockContext => None
case _: SnapshotRequestBlockContext => None
case _: DataStreamRequestBlockContext => None
case bc: TemplateRequestBlockContext => Some(bc.templateOperation)
case _: AliasRequestBlockContext => None
case _: GeneralIndexRequestBlockContext => None
Expand All @@ -425,6 +481,7 @@ object BlockContext {
case _: GeneralNonIndexRequestBlockContext => None
case _: RepositoryRequestBlockContext => None
case _: SnapshotRequestBlockContext => None
case _: DataStreamRequestBlockContext => None
case _: TemplateRequestBlockContext => None
case _: AliasRequestBlockContext => None
case _: GeneralIndexRequestBlockContext => None
Expand Down Expand Up @@ -454,6 +511,7 @@ object BlockContext {
case _: GeneralNonIndexRequestBlockContext => hasIndices[GeneralNonIndexRequestBlockContext]
case _: RepositoryRequestBlockContext => hasIndices[RepositoryRequestBlockContext]
case _: SnapshotRequestBlockContext => hasIndices[SnapshotRequestBlockContext]
case _: DataStreamRequestBlockContext => hasIndices[DataStreamRequestBlockContext]
case _: AliasRequestBlockContext => hasIndices[AliasRequestBlockContext]
case _: TemplateRequestBlockContext => hasIndices[TemplateRequestBlockContext]
case _: GeneralIndexRequestBlockContext => hasIndices[GeneralIndexRequestBlockContext]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,30 @@ object BlockContextUpdater {
blockContext.copy(filteredIndices = indices)
}

implicit object DataStreamRequestBlockContextUpdater
extends BlockContextUpdater[DataStreamRequestBlockContext] {

override def emptyBlockContext(blockContext: DataStreamRequestBlockContext): DataStreamRequestBlockContext =
DataStreamRequestBlockContext(blockContext.requestContext, UserMetadata.empty, Set.empty, List.empty, Set.empty, DataStreamRequestBlockContext.BackingIndices.IndicesNotInvolved)

override def withUserMetadata(blockContext: DataStreamRequestBlockContext,
userMetadata: UserMetadata): DataStreamRequestBlockContext =
blockContext.copy(userMetadata = userMetadata)

override def withAddedResponseHeader(blockContext: DataStreamRequestBlockContext,
header: Header): DataStreamRequestBlockContext =
blockContext.copy(responseHeaders = blockContext.responseHeaders + header)

override def withAddedResponseTransformation(blockContext: DataStreamRequestBlockContext,
responseTransformation: ResponseTransformation): DataStreamRequestBlockContext =
blockContext.copy(responseTransformations = responseTransformation :: blockContext.responseTransformations)

def withDataStreams(blockContext: DataStreamRequestBlockContext,
dataStreams: Set[DataStreamName]): DataStreamRequestBlockContext = {
blockContext.copy(dataStreams = dataStreams)
}
}

implicit object TemplateRequestBlockContextUpdater
extends BlockContextUpdater[TemplateRequestBlockContext] {

Expand Down Expand Up @@ -285,7 +309,7 @@ object BlockContextUpdater {
}
}

abstract class BlockContextWithIndicesUpdater[B <: BlockContext: HasIndices] {
abstract class BlockContextWithIndicesUpdater[B <: BlockContext : HasIndices] {

def withIndices(blockContext: B, filteredIndices: Set[ClusterIndexName], allAllowedIndices: Set[ClusterIndexName]): B
}
Expand Down Expand Up @@ -319,6 +343,18 @@ object BlockContextWithIndicesUpdater {
allAllowedIndices: Set[ClusterIndexName]): SnapshotRequestBlockContext =
blockContext.copy(filteredIndices = filteredIndices, allAllowedIndices = allAllowedIndices)
}

implicit object DataStreamRequestBlocContextWithIndicesUpdater
extends BlockContextWithIndicesUpdater[DataStreamRequestBlockContext] {

def withIndices(blockContext: DataStreamRequestBlockContext,
filteredIndices: Set[ClusterIndexName],
allAllowedIndices: Set[ClusterIndexName]): DataStreamRequestBlockContext =
blockContext.copy(backingIndices = DataStreamRequestBlockContext.BackingIndices.IndicesInvolved(
filteredIndices = filteredIndices,
allAllowedIndices = allAllowedIndices
))
}
}

abstract class BlockContextWithIndexPacksUpdater[B <: BlockContext : HasIndexPacks] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object ImpersonationWarning {

implicit val actionsRule: ImpersonationWarningSupport[ActionsRule] = NotSupported
implicit val apiKeyRule: ImpersonationWarningSupport[ApiKeysRule] = NotSupported
implicit val dataStreamsRule: ImpersonationWarningSupport[DataStreamsRule] = NotSupported
implicit val fieldsRule: ImpersonationWarningSupport[FieldsRule] = NotSupported
implicit val filterRule: ImpersonationWarningSupport[FilterRule] = NotSupported
implicit val headersAndRule: ImpersonationWarningSupport[HeadersAndRule] = NotSupported
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* This file is part of ReadonlyREST.
*
* ReadonlyREST is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* ReadonlyREST is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with ReadonlyREST. If not, see http://www.gnu.org/licenses/
*/
package tech.beshu.ror.accesscontrol.blocks.rules

import cats.data.NonEmptySet
import cats.implicits._
import monix.eval.Task
import org.apache.logging.log4j.scala.Logging
import tech.beshu.ror.accesscontrol.blocks.BlockContext.DataStreamRequestBlockContext
import tech.beshu.ror.accesscontrol.blocks.rules.DataStreamsRule.Settings
import tech.beshu.ror.accesscontrol.blocks.rules.base.Rule
import tech.beshu.ror.accesscontrol.blocks.rules.base.Rule.RuleResult.{Fulfilled, Rejected}
import tech.beshu.ror.accesscontrol.blocks.rules.base.Rule.{RegularRule, RuleName, RuleResult}
import tech.beshu.ror.accesscontrol.blocks.variables.runtime.RuntimeMultiResolvableVariable
import tech.beshu.ror.accesscontrol.blocks.{BlockContext, BlockContextUpdater}
import tech.beshu.ror.accesscontrol.domain.DataStreamName
import tech.beshu.ror.accesscontrol.matchers.ZeroKnowledgeDataStreamsFilterScalaAdapter.CheckResult
import tech.beshu.ror.accesscontrol.matchers.{MatcherWithWildcardsScalaAdapter, ZeroKnowledgeDataStreamsFilterScalaAdapter}
import tech.beshu.ror.accesscontrol.request.RequestContext
import tech.beshu.ror.accesscontrol.utils.RuntimeMultiResolvableVariableOps.resolveAll
import tech.beshu.ror.utils.ZeroKnowledgeIndexFilter

class DataStreamsRule(val settings: Settings)
extends RegularRule
with Logging {

override val name: Rule.Name = DataStreamsRule.Name.name

private val zeroKnowledgeMatchFilter = new ZeroKnowledgeDataStreamsFilterScalaAdapter(
new ZeroKnowledgeIndexFilter(true)
)

override def regularCheck[B <: BlockContext : BlockContextUpdater](blockContext: B): Task[Rule.RuleResult[B]] = Task {
BlockContextUpdater[B] match {
case BlockContextUpdater.DataStreamRequestBlockContextUpdater =>
checkDataStreams(blockContext)
case _ =>
Fulfilled(blockContext)
}
}

private def checkDataStreams(blockContext: DataStreamRequestBlockContext): RuleResult[DataStreamRequestBlockContext] = {
checkAllowedDataStreams(
resolveAll(settings.allowedDataStreams.toNonEmptyList, blockContext).toSet,
blockContext.dataStreams,
blockContext.requestContext
) match {
case Right(filteredDataStreams) => Fulfilled(blockContext.withDataStreams(filteredDataStreams))
case Left(()) => Rejected()
}
}

private def checkAllowedDataStreams(allowedDataStreams: Set[DataStreamName],
dataStreamsToCheck: Set[DataStreamName],
requestContext: RequestContext) = {
if (allowedDataStreams.contains(DataStreamName.All) || allowedDataStreams.contains(DataStreamName.Wildcard)) {
Right(dataStreamsToCheck)
} else {
zeroKnowledgeMatchFilter.check(
dataStreamsToCheck,
MatcherWithWildcardsScalaAdapter.create(allowedDataStreams)
) match {
case CheckResult.Ok(processedDataStreams) if requestContext.isReadOnlyRequest =>
Right(processedDataStreams)
case CheckResult.Ok(processedDataStreams) if processedDataStreams.size === dataStreamsToCheck.size =>
Right(processedDataStreams)
case CheckResult.Ok(processedDataStreams) =>
val filteredOutDataStreams = dataStreamsToCheck.diff(processedDataStreams).map(_.show)
logger.debug(
s"[${requestContext.id.show}] Write request with data streams cannot proceed because some of the data streams " +
s"[${filteredOutDataStreams.toList.mkString_(",")}] were filtered out by ACL. The request will be rejected.."
)
Left(())
case CheckResult.Failed =>
logger.debug(s"[${requestContext.id.show}] The processed data streams do not match the allowed data streams. The request will be rejected..")
Left(())
}
}
}
}

object DataStreamsRule {
implicit case object Name extends RuleName[DataStreamsRule] {
override val name: Rule.Name = Rule.Name("data_streams")
}

final case class Settings(allowedDataStreams: NonEmptySet[RuntimeMultiResolvableVariable[DataStreamName]])
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class FieldsRule(val settings: Settings)
case GeneralNonIndexRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
case RepositoryRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
case SnapshotRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
case DataStreamRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
case TemplateRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
case GeneralIndexRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
case MultiIndexRequestBlockContextUpdater => RuleResult.Fulfilled(blockContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class FilterRule(val settings: Settings)
case GeneralNonIndexRequestBlockContextUpdater => Fulfilled(blockContext)
case RepositoryRequestBlockContextUpdater => Fulfilled(blockContext)
case SnapshotRequestBlockContextUpdater => Fulfilled(blockContext)
case DataStreamRequestBlockContextUpdater => Fulfilled(blockContext)
case TemplateRequestBlockContextUpdater => Fulfilled(blockContext)
case GeneralIndexRequestBlockContextUpdater => Fulfilled(blockContext)
case AliasRequestBlockContextUpdater => Fulfilled(blockContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tech.beshu.ror.accesscontrol.blocks.rules
import cats.data.NonEmptySet
import cats.implicits._
import monix.eval.Task
import org.apache.logging.log4j.scala.Logging
import tech.beshu.ror.accesscontrol.blocks.BlockContext.{RepositoryRequestBlockContext, _}
import tech.beshu.ror.accesscontrol.blocks.rules.RepositoriesRule.Settings
import tech.beshu.ror.accesscontrol.blocks.rules.base.Rule
Expand All @@ -34,7 +35,8 @@ import tech.beshu.ror.accesscontrol.utils.RuntimeMultiResolvableVariableOps.reso
import tech.beshu.ror.utils.ZeroKnowledgeIndexFilter

class RepositoriesRule(val settings: Settings)
extends RegularRule {
extends RegularRule
with Logging {

override val name: Rule.Name = RepositoriesRule.Name.name

Expand Down Expand Up @@ -87,7 +89,15 @@ class RepositoriesRule(val settings: Settings)
Right(processedRepositories)
case CheckResult.Ok(processedRepositories) if processedRepositories.size === repositoriesToCheck.size =>
Right(processedRepositories)
case CheckResult.Ok(_) | CheckResult.Failed =>
case CheckResult.Ok(processedRepositories) =>
val filteredOutRepositories = repositoriesToCheck.diff(processedRepositories).map(_.show)
logger.debug(
s"[${requestContext.id.show}] Write request with repositories cannot proceed because some of the repositories " +
s"[${filteredOutRepositories.toList.mkString_(",")}] were filtered out by ACL. The request will be rejected.."
)
Left(())
case CheckResult.Failed =>
logger.debug(s"[${requestContext.id.show}] The processed repositories do not match the allowed repositories. The request will be rejected..")
Left(())
}
}
Expand Down
Loading

0 comments on commit d034631

Please sign in to comment.