# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2025)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Literal, cast

from typing_extensions import Self, TypeAlias

from streamlit.delta_generator import DeltaGenerator
from streamlit.errors import StreamlitAPIException
from streamlit.proto.Block_pb2 import Block as BlockProto
from streamlit.proto.ForwardMsg_pb2 import ForwardMsg
from streamlit.runtime.scriptrunner_utils.script_run_context import (
    enqueue_message,
    get_script_run_ctx,
)

if TYPE_CHECKING:
    from types import TracebackType

    from streamlit.cursor import Cursor

DialogWidth: TypeAlias = Literal["small", "large"]


def _process_dialog_width_input(
    width: DialogWidth,
) -> BlockProto.Dialog.DialogWidth.ValueType:
    """Maps the user-provided literal to a value of the DialogWidth proto enum.

    Returns the mapped enum field for "small" by default and otherwise the mapped type.
    """
    if width == "large":
        return BlockProto.Dialog.DialogWidth.LARGE

    return BlockProto.Dialog.DialogWidth.SMALL


def _assert_first_dialog_to_be_opened(should_open: bool) -> None:
    """Check whether a dialog has already been opened in the same script run.

    Only one dialog is supposed to be opened. The check is implemented in a way
    that for a script run, the open function can only be called once.
    One dialog at a time is a product decision and not a technical one.

    Raises
    ------
    StreamlitAPIException
        Raised when a dialog has already been opened in the current script run.
    """
    script_run_ctx = get_script_run_ctx()
    # We don't reset the ctx.has_dialog_opened when the flag is False because
    # it is reset in a new scriptrun anyways. If the execution model ever changes,
    # this might need to change.
    if should_open and script_run_ctx:
        if script_run_ctx.has_dialog_opened:
            raise StreamlitAPIException(
                "Only one dialog is allowed to be opened at the same time. Please make sure to not call a dialog-decorated function more than once in a script run."
            )
        script_run_ctx.has_dialog_opened = True


class Dialog(DeltaGenerator):
    @staticmethod
    def _create(
        parent: DeltaGenerator,
        title: str,
        *,
        dismissible: bool = True,
        width: DialogWidth = "small",
    ) -> Dialog:
        block_proto = BlockProto()
        block_proto.dialog.title = title
        block_proto.dialog.dismissible = dismissible
        block_proto.dialog.width = _process_dialog_width_input(width)

        # We store the delta path here, because in _update we enqueue a new proto
        # message to update the open status. Without this, the dialog content is gone
        # when the _update message is sent
        delta_path: list[int] = (
            parent._active_dg._cursor.delta_path if parent._active_dg._cursor else []
        )
        dialog = cast(Dialog, parent._block(block_proto=block_proto, dg_type=Dialog))

        dialog._delta_path = delta_path
        dialog._current_proto = block_proto
        return dialog

    def __init__(
        self,
        root_container: int | None,
        cursor: Cursor | None,
        parent: DeltaGenerator | None,
        block_type: str | None,
    ):
        super().__init__(root_container, cursor, parent, block_type)

        # Initialized in `_create()`:
        self._current_proto: BlockProto | None = None
        self._delta_path: list[int] | None = None

    def _update(self, should_open: bool):
        """Send an updated proto message to indicate the open-status for the dialog."""

        assert self._current_proto is not None, "Dialog not correctly initialized!"
        assert self._delta_path is not None, "Dialog not correctly initialized!"
        _assert_first_dialog_to_be_opened(should_open)
        msg = ForwardMsg()
        msg.metadata.delta_path[:] = self._delta_path
        msg.delta.add_block.CopyFrom(self._current_proto)
        msg.delta.add_block.dialog.is_open = should_open
        self._current_proto = msg.delta.add_block

        enqueue_message(msg)

    def open(self) -> None:
        self._update(True)

    def close(self) -> None:
        self._update(False)

    def __enter__(self) -> Self:  # type: ignore[override]
        # This is a little dubious: we're returning a different type than
        # our superclass' `__enter__` function. Maybe DeltaGenerator.__enter__
        # should always return `self`?
        super().__enter__()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> Literal[False]:
        return super().__exit__(exc_type, exc_val, exc_tb)
