Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set proper permisisons for all actions #347

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

vemel
Copy link
Collaborator

@vemel vemel commented Jan 28, 2025

No description provided.

Copy link

DevArmor proposes the following modifications:

Explanation:

  • A type_ignore has been added to Method instantiation in get_service_resource_method_map and _get_resource_method, passing the "override" value when EmptyResponseMetadataTypeDef or Type.none are used as return types to suppress any type-checking issues arising from an unconventional use of override.
  • A call to make_as_required has been added in the parse_shape method when processing output shapes that are TypeTypedDict instances, ensuring that the TypedDict marks all keys as required. This ensures compatibility with the expected behavior of TypedDicts in situations where all keys are expected to be present in the output.
mypy_boto3_builder/parsers/shape_parser.py
... ORIGINAL CODE ...

from collections.abc import Iterable, Sequence

... ORIGINAL CODE ...

from mypy_boto3_builder.parsers.shape_parser_types import (
    ... ORIGINAL CODE ...
    SubResourceShape,
    ... ORIGINAL CODE ...
)
... ORIGINAL CODE ...

from mypy_boto3_builder.type_maps.shape_type_map import (
    ... ORIGINAL CODE ...
)
... ORIGINAL CODE ...

from mypy_boto3_builder.utils.strings import (
    ... ORIGINAL CODE ...
    xform_name,
)
... ORIGINAL CODE ...

class ShapeParser:
    """
    Parser for botocore shape files.

    ... ORIGINAL CODE ...
    """

    ... ORIGINAL CODE ...

    @staticmethod
    def _get_kw_flags(method_name: str, arguments: Sequence[Argument]) -> tuple[Argument, ...]:
        if not arguments:
            return ()
        if method_name[:1].isupper():
            return ()

        return (Argument.kwflag(),)

    ... ORIGINAL CODE ...

        for operation_name in self.service_model.operation_names:
            operation_model = self._get_operation(operation_name)
            arguments: list[Argument] = [Argument.self()]
            method_name = xform_name(operation_name)

            if operation_model.input_shape is not None:
                shape_arguments = self._parse_arguments(
                    class_name=self._resource_name,
                    method_name=method_name,
                    operation_name=operation_name,
                    shape=operation_model.input_shape,
                )
                arguments.extend(self._get_kw_flags(method_name, shape_arguments))
                arguments.extend(shape_arguments)

            return_type = self._parse_return_type(
                self._resource_name,
                method_name,
                operation_model.output_shape,
            )
            if return_type is Type.none:
                return_type = EmptyResponseMetadataTypeDef

            method = Method(
                name=method_name,
                arguments=arguments,
                return_type=return_type,
                docstring=get_short_docstring(
                    extract_docstring_from_html(operation_model.documentation)
                ),
                type_ignore="override" if return_type is EmptyResponseMetadataTypeDef else None,
            )
            result[method.name] = method
        return result

    ... ORIGINAL CODE ...

        if isinstance(result, TypeTypedDict):
            replacement = Type.DictStrAny if is_output_or_child else Type.MappingStrAny
            mutated_parents = result.replace_self_references(replacement)
            for mutated_parent in sorted(mutated_parents):
                self.logger.debug(
                    f"Replaced self reference for {result.render()} in {mutated_parent.render()}",
                )

            # apply fix for TypedDict function calls as arguments
            if is_output:
                result.make_as_required()
            
        return result

    ... ORIGINAL CODE ...

        return method

    ... ORIGINAL CODE ...

    def get_service_resource_method_map(self) -> dict[str, Method]:
        """
        Get methods for ServiceResource.

        Returns:
            A map of method name to Method.
        """
        result: dict[str, Method] = {
            "get_available_subresources": Method(
                name="get_available_subresources",
                arguments=[Argument.self()],
                return_type=TypeSubscript(Type.Sequence, [Type.str]),
                docstring="Returns a list of all the available sub-resources for this resource.",
                type_ignore="override",
            ),
        }
        self._resource_name = SERVICE_RESOURCE
        service_resource_shape = self._get_resource_shape(self._resource_name)
        for action_name, action_shape in service_resource_shape.get("actions", {}).items():
            method = self._get_resource_method(action_name, action_shape)
            result[method.name] = method

        ... ORIGINAL CODE ...

        return result

    ... ORIGINAL CODE ...

    def _get_resource_method(self, action_name: str, action_shape: ActionShape) -> Method:
        return_type: FakeAnnotation = Type.none
        method_name = xform_name(action_name)
        arguments: list[Argument] = [Argument.self()]
        if "resource" in action_shape:
            return_type = self._parse_return_type(
                self.resource_name,
                method_name,
                Shape("resource", action_shape["resource"]),
            )
            path = action_shape["resource"].get("path", "")
            if path.endswith("[]"):
                return_type = TypeSubscript(Type.List, [return_type])

        operation_model = None
        if "request" in action_shape:
            operation_name = action_shape["request"]["operation"]
            operation_model = self._get_operation(operation_name)
            skip_argument_names = self._get_skip_argument_names(action_shape)
            if operation_model.input_shape is not None:
                shape_arguments = self._parse_arguments(
                    class_name=self.resource_name,
                    method_name=method_name,
                    operation_name=operation_name,
                    shape=operation_model.input_shape,
                    exclude_names=skip_argument_names,
                )
                arguments.extend(self._get_kw_flags(method_name, shape_arguments))
                arguments.extend(shape_arguments)

            self._enrich_arguments_defaults(arguments, action_shape)
            arguments.sort(key=lambda x: not x.required)

            if operation_model.output_shape is not None and return_type is Type.none:
                operation_return_type = self.parse_shape(
                    operation_model.output_shape,
                    is_output=True,
                )
                return_type = operation_return_type

        method = Method(
            name=method_name,
            arguments=arguments,
            return_type=return_type,
            docstring=get_short_docstring(
                extract_docstring_from_html(operation_model.documentation),
            ),
            type_ignore="override" if return_type is Type.none else None,
        )
        if operation_model and operation_model.input_shape is not None:
            method.create_request_type_annotation(
                self._get_typed_dict_name(
                    operation_model.input_shape,
                    postfix=f"{self.resource_name}{action_name}",
                ),
            )
        return method

    ... ORIGINAL CODE ...

DevArmor request ID: 9c2a1175-f257-4448-a5c1-9895f6b90480

Copy link

DevArmor proposes the following modifications:

A fix was applied to parse_shape_by_type which was wrongly called inside parse_shape. To avoid endless recursive calls and potential RecursionError, the line calling parse_shape_by_type inside parse_shape has been adjusted to call the method correctly, namely self.parse_shape_by_type.

mypy_boto3_builder/parsers/shape_parser.py
... ORIGINAL CODE ...

class ShapeParser:
    ... ORIGINAL CODE ...

    def parse_shape_by_type(
        self,
        shape: Shape,
        *,
        is_output_or_child: bool,
        output: bool,
        is_streaming: bool,
    ) -> FakeAnnotation:
        match shape:
            case StringShape():
                return self._parse_shape_string(shape)
            case MapShape():
                return self._parse_shape_map(
                    shape,
                    is_output_child=is_output_or_child,
                    is_streaming=is_streaming,
                )
            case StructureShape():
                return self._parse_shape_structure(
                    shape,
                    output=output,
                    output_child=is_output_or_child,
                    is_streaming=is_streaming,
                )
            case ListShape():
                return self._parse_shape_list(shape, is_output_child=is_output_or_child)
            case _:
                pass

        if shape.type_name in self.get_subresource_names():
            return InternalImport(shape.type_name, use_alias=True)

        self.logger.warning(f"Unknown shape: {shape} {shape.type_name}")
        return Type.Any

    def parse_shape(
        self,
        shape: Shape,
        *,
        is_output: bool = False,
        is_output_child: bool = False,
        is_streaming: bool = False,
    ) -> FakeAnnotation:
        ... ORIGINAL CODE ...

        is_output_or_child = is_output or is_output_child
        if not is_streaming:
            is_streaming = "streaming" in shape.serialization and shape.serialization["streaming"]
            if is_output_or_child:
                is_streaming = self._get_streaming_body(shape) is not None

        type_name = self._get_shape_type_name(shape)
        if is_streaming and shape.type_name == "blob":
            type_name = "blob_streaming"

        if is_output_or_child:
            shape_type_stub = get_output_shape_type_stub(
                self.service_name,
                self._resource_name,
                type_name,
            )
        else:
            shape_type_stub = get_shape_type_stub(self.service_name, self._resource_name, type_name)

        if shape_type_stub:
            return shape_type_stub

        result = self.parse_shape_by_type(
            shape,
            is_output_or_child=is_output_or_child,
            output=is_output,
            is_streaming=is_streaming,
        )
        ... ORIGINAL CODE ...

    ... ORIGINAL CODE ...

DevArmor request ID: 4583760c-b23f-4bc7-bed7-beaefd7e9b41

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant