# copied and adapted from https://github.com/getsentry/sentry-python/blob/269d96d6e9821122fbff280e6a26956e5ed03c0b/sentry_sdk/utils.py#L689
# 💖open source (under MIT License)
# We want to keep payloads as similar to Sentry as possible for easy interoperability

import linecache
import os
import re
import sys
from datetime import datetime
from typing import TYPE_CHECKING

try:
    # Python 3.11
    from builtins import BaseExceptionGroup
except ImportError:
    # Python 3.10 and below
    BaseExceptionGroup = None  # type: ignore


DEFAULT_MAX_VALUE_LENGTH = 1024


if TYPE_CHECKING:

    from types import FrameType, TracebackType
    from typing import (  # noqa: F401
        Any,
        Callable,
        Dict,
        Iterator,
        List,
        Literal,
        Optional,
        Set,
        Tuple,
        Type,
        TypedDict,
        TypeVar,
        Union,
        cast,
    )

    ExcInfo = Union[
        Tuple[Type[BaseException], BaseException, Optional[TracebackType]],
        Tuple[None, None, None],
    ]
    LogLevelStr = Literal["fatal", "critical", "error", "warning", "info", "debug"]

    Event = TypedDict(
        "Event",
        {
            "breadcrumbs": Dict[Literal["values"], List[Dict[str, Any]]],  # TODO: We can expand on this type
            "check_in_id": str,
            "contexts": Dict[str, Dict[str, object]],
            "dist": str,
            "duration": Optional[float],
            "environment": str,
            "errors": List[Dict[str, Any]],  # TODO: We can expand on this type
            "event_id": str,
            "exception": Dict[Literal["values"], List[Dict[str, Any]]],  # TODO: We can expand on this type
            # "extra": MutableMapping[str, object],
            # "fingerprint": List[str],
            "level": LogLevelStr,
            # "logentry": Mapping[str, object],
            "logger": str,
            # "measurements": Dict[str, MeasurementValue],
            "message": str,
            "modules": Dict[str, str],
            # "monitor_config": Mapping[str, object],
            "monitor_slug": Optional[str],
            "platform": Literal["python"],
            "profile": object,  # Should be sentry_sdk.profiler.Profile, but we can't import that here due to circular imports
            "release": str,
            "request": Dict[str, object],
            # "sdk": Mapping[str, object],
            "server_name": str,
            "spans": List[Dict[str, object]],
            "stacktrace": Dict[str, object],  # We access this key in the code, but I am unsure whether we ever set it
            "start_timestamp": datetime,
            "status": Optional[str],
            # "tags": MutableMapping[
            #     str, str
            # ],  # Tags must be less than 200 characters each
            "threads": Dict[Literal["values"], List[Dict[str, Any]]],  # TODO: We can expand on this type
            "timestamp": Optional[datetime],  # Must be set before sending the event
            "transaction": str,
            # "transaction_info": Mapping[str, Any],  # TODO: We can expand on this type
            "type": Literal["check_in", "transaction"],
            "user": Dict[str, object],
            "_metrics_summary": Dict[str, object],
        },
        total=False,
    )


epoch = datetime(1970, 1, 1)


BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$")

SENSITIVE_DATA_SUBSTITUTE = "[Filtered]"


def to_timestamp(value):
    # type: (datetime) -> float
    return (value - epoch).total_seconds()


def format_timestamp(value):
    # type: (datetime) -> str
    return value.strftime("%Y-%m-%dT%H:%M:%S.%fZ")


def event_hint_with_exc_info(exc_info=None):
    # type: (Optional[ExcInfo]) -> Dict[str, Optional[ExcInfo]]
    """Creates a hint with the exc info filled in."""
    if exc_info is None:
        exc_info = sys.exc_info()
    else:
        exc_info = exc_info_from_error(exc_info)
    if exc_info[0] is None:
        exc_info = None
    return {"exc_info": exc_info}


class AnnotatedValue:
    """
    Meta information for a data field in the event payload.
    This is to tell Relay that we have tampered with the fields value.
    See:
    https://github.com/getsentry/relay/blob/be12cd49a0f06ea932ed9b9f93a655de5d6ad6d1/relay-general/src/types/meta.rs#L407-L423
    """

    __slots__ = ("value", "metadata")

    def __init__(self, value, metadata):
        # type: (Optional[Any], Dict[str, Any]) -> None
        self.value = value
        self.metadata = metadata

    def __eq__(self, other):
        # type: (Any) -> bool
        if not isinstance(other, AnnotatedValue):
            return False

        return self.value == other.value and self.metadata == other.metadata

    @classmethod
    def removed_because_raw_data(cls):
        # type: () -> AnnotatedValue
        """The value was removed because it could not be parsed. This is done for request body values that are not json nor a form."""
        return AnnotatedValue(
            value="",
            metadata={
                "rem": [  # Remark
                    [
                        "!raw",  # Unparsable raw data
                        "x",  # The fields original value was removed
                    ]
                ]
            },
        )

    @classmethod
    def removed_because_over_size_limit(cls):
        # type: () -> AnnotatedValue
        """The actual value was removed because the size of the field exceeded the configured maximum size (specified with the max_request_body_size sdk option)"""
        return AnnotatedValue(
            value="",
            metadata={
                "rem": [  # Remark
                    [
                        "!config",  # Because of configured maximum size
                        "x",  # The fields original value was removed
                    ]
                ]
            },
        )

    @classmethod
    def substituted_because_contains_sensitive_data(cls):
        # type: () -> AnnotatedValue
        """The actual value was removed because it contained sensitive information."""
        return AnnotatedValue(
            value=SENSITIVE_DATA_SUBSTITUTE,
            metadata={
                "rem": [  # Remark
                    [
                        "!config",  # Because of SDK configuration (in this case the config is the hard coded removal of certain django cookies)
                        "s",  # The fields original value was substituted
                    ]
                ]
            },
        )


if TYPE_CHECKING:
    T = TypeVar("T")
    Annotated = Union[AnnotatedValue, T]


def get_type_name(cls):
    # type: (Optional[type]) -> Optional[str]
    return getattr(cls, "__qualname__", None) or getattr(cls, "__name__", None)


def get_type_module(cls):
    # type: (Optional[type]) -> Optional[str]
    mod = getattr(cls, "__module__", None)
    if mod not in (None, "builtins", "__builtins__"):
        return mod
    return None


def should_hide_frame(frame: "FrameType") -> bool:
    try:
        mod = frame.f_globals["__name__"]
        if mod.startswith("sentry_sdk."):
            return True
    except (AttributeError, KeyError):
        pass

    for flag_name in "__traceback_hide__", "__tracebackhide__":
        try:
            if frame.f_locals[flag_name]:
                return True
        except Exception:
            pass

    return False


def iter_stacks(tb):
    # type: (Optional[TracebackType]) -> Iterator[TracebackType]
    tb_ = tb  # type: Optional[TracebackType]
    while tb_ is not None:
        if not should_hide_frame(tb_.tb_frame):
            yield tb_
        tb_ = tb_.tb_next


def get_lines_from_file(
    filename,  # type: str
    lineno,  # type: int
    max_length=None,  # type: Optional[int]
    loader=None,  # type: Optional[Any]
    module=None,  # type: Optional[str]
):
    # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]
    context_lines = 5
    source = None
    if loader is not None and hasattr(loader, "get_source"):
        try:
            source_str = loader.get_source(module)  # type: Optional[str]
        except (ImportError, IOError):
            source_str = None
        if source_str is not None:
            source = source_str.splitlines()

    if source is None:
        try:
            source = linecache.getlines(filename)
        except (OSError, IOError):
            return [], None, []

    if not source:
        return [], None, []

    lower_bound = max(0, lineno - context_lines)
    upper_bound = min(lineno + 1 + context_lines, len(source))

    try:
        pre_context = [strip_string(line.strip("\r\n"), max_length=max_length) for line in source[lower_bound:lineno]]
        context_line = strip_string(source[lineno].strip("\r\n"), max_length=max_length)
        post_context = [
            strip_string(line.strip("\r\n"), max_length=max_length)
            for line in source[(lineno + 1) : upper_bound]  # noqa: E203
        ]
        return pre_context, context_line, post_context
    except IndexError:
        # the file may have changed since it was loaded into memory
        return [], None, []


def get_source_context(
    frame,  # type: FrameType
    tb_lineno,  # type: int
    max_value_length=None,  # type: Optional[int]
):
    # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]
    try:
        abs_path = frame.f_code.co_filename  # type: Optional[str]
    except Exception:
        abs_path = None
    try:
        module = frame.f_globals["__name__"]
    except Exception:
        return [], None, []
    try:
        loader = frame.f_globals["__loader__"]
    except Exception:
        loader = None
    lineno = tb_lineno - 1
    if lineno is not None and abs_path:
        return get_lines_from_file(abs_path, lineno, max_value_length, loader=loader, module=module)
    return [], None, []


def safe_str(value):
    # type: (Any) -> str
    try:
        return str(value)
    except Exception:
        return safe_repr(value)


def safe_repr(value):
    # type: (Any) -> str
    try:
        return repr(value)
    except Exception:
        return "<broken repr>"


def filename_for_module(module, abs_path):
    # type: (Optional[str], Optional[str]) -> Optional[str]
    if not abs_path or not module:
        return abs_path

    try:
        if abs_path.endswith(".pyc"):
            abs_path = abs_path[:-1]

        base_module = module.split(".", 1)[0]
        if base_module == module:
            return os.path.basename(abs_path)

        base_module_path = sys.modules[base_module].__file__
        if not base_module_path:
            return abs_path

        return abs_path.split(base_module_path.rsplit(os.sep, 2)[0], 1)[-1].lstrip(os.sep)
    except Exception:
        return abs_path


def serialize_frame(
    frame,
    tb_lineno=None,
    include_local_variables=True,
    include_source_context=True,
    max_value_length=None,
    custom_repr=None,
):
    # type: (FrameType, Optional[int], bool, bool, Optional[int], Optional[Callable[..., Optional[str]]]) -> Dict[str, Any]
    f_code = getattr(frame, "f_code", None)
    if not f_code:
        abs_path = None
        function = None
    else:
        abs_path = frame.f_code.co_filename
        function = frame.f_code.co_name
    try:
        module = frame.f_globals["__name__"]
    except Exception:
        module = None

    if tb_lineno is None:
        tb_lineno = frame.f_lineno

    rv = {
        "platform": "python",
        "filename": filename_for_module(module, abs_path) or None,
        "abs_path": os.path.abspath(abs_path) if abs_path else None,
        "function": function or "<unknown>",
        "module": module,
        "lineno": tb_lineno,
    }  # type: Dict[str, Any]

    if include_source_context:
        rv["pre_context"], rv["context_line"], rv["post_context"] = get_source_context(
            frame, tb_lineno, max_value_length
        )

    if include_local_variables:
        # TODO(nk): Sort out this current invalid import
        # from sentry_sdk.serializer import serialize

        # rv["vars"] = serialize(
        #     dict(frame.f_locals), is_vars=True, custom_repr=custom_repr
        # )
        pass

    return rv


def current_stacktrace(
    include_local_variables=True,  # type: bool
    include_source_context=True,  # type: bool
    max_value_length=None,  # type: Optional[int]
):
    # type: (...) -> Dict[str, Any]
    __tracebackhide__ = True
    frames = []

    f = sys._getframe()  # type: Optional[FrameType]
    while f is not None:
        if not should_hide_frame(f):
            frames.append(
                serialize_frame(
                    f,
                    include_local_variables=include_local_variables,
                    include_source_context=include_source_context,
                    max_value_length=max_value_length,
                )
            )
        f = f.f_back

    frames.reverse()

    return {"frames": frames, "type": "raw"}


def get_errno(exc_value):
    # type: (BaseException) -> Optional[Any]
    return getattr(exc_value, "errno", None)


def get_error_message(exc_value):
    # type: (Optional[BaseException]) -> str
    return getattr(exc_value, "message", "") or getattr(exc_value, "detail", "") or safe_str(exc_value)


def single_exception_from_error_tuple(
    exc_type,  # type: Optional[type]
    exc_value,  # type: Optional[BaseException]
    tb,  # type: Optional[TracebackType]
    client_options=None,  # type: Optional[Dict[str, Any]]
    mechanism=None,  # type: Optional[Dict[str, Any]]
    exception_id=None,  # type: Optional[int]
    parent_id=None,  # type: Optional[int]
    source=None,  # type: Optional[str]
):
    # type: (...) -> Dict[str, Any]
    """
    Creates a dict that goes into the events `exception.values` list and is ingestible by Sentry.

    See the Exception Interface documentation for more details:
    https://develop.sentry.dev/sdk/event-payloads/exception/
    """
    exception_value = {}  # type: Dict[str, Any]
    exception_value["mechanism"] = mechanism.copy() if mechanism else {"type": "generic", "handled": True}
    if exception_id is not None:
        exception_value["mechanism"]["exception_id"] = exception_id

    if exc_value is not None:
        errno = get_errno(exc_value)
    else:
        errno = None

    if errno is not None:
        exception_value["mechanism"].setdefault("meta", {}).setdefault("errno", {}).setdefault("number", errno)

    if source is not None:
        exception_value["mechanism"]["source"] = source

    is_root_exception = exception_id == 0
    if not is_root_exception and parent_id is not None:
        exception_value["mechanism"]["parent_id"] = parent_id
        exception_value["mechanism"]["type"] = "chained"

    if is_root_exception and "type" not in exception_value["mechanism"]:
        exception_value["mechanism"]["type"] = "generic"

    is_exception_group = BaseExceptionGroup is not None and isinstance(exc_value, BaseExceptionGroup)
    if is_exception_group:
        exception_value["mechanism"]["is_exception_group"] = True

    exception_value["module"] = get_type_module(exc_type)
    exception_value["type"] = get_type_name(exc_type)
    exception_value["value"] = get_error_message(exc_value)

    if client_options is None:
        include_local_variables = True
        include_source_context = True
        max_value_length = DEFAULT_MAX_VALUE_LENGTH  # fallback
        custom_repr = None
    else:
        include_local_variables = client_options["include_local_variables"]
        include_source_context = client_options["include_source_context"]
        max_value_length = client_options["max_value_length"]
        custom_repr = client_options.get("custom_repr")

    frames = [
        serialize_frame(
            tb.tb_frame,
            tb_lineno=tb.tb_lineno,
            include_local_variables=include_local_variables,
            include_source_context=include_source_context,
            max_value_length=max_value_length,
            custom_repr=custom_repr,
        )
        for tb in iter_stacks(tb)
    ]

    if frames:
        exception_value["stacktrace"] = {"frames": frames, "type": "raw"}

    return exception_value


HAS_CHAINED_EXCEPTIONS = hasattr(Exception, "__suppress_context__")

if HAS_CHAINED_EXCEPTIONS:

    def walk_exception_chain(exc_info):
        # type: (ExcInfo) -> Iterator[ExcInfo]
        exc_type, exc_value, tb = exc_info

        seen_exceptions = []
        seen_exception_ids = set()  # type: Set[int]

        while exc_type is not None and exc_value is not None and id(exc_value) not in seen_exception_ids:
            yield exc_type, exc_value, tb

            # Avoid hashing random types we don't know anything
            # about. Use the list to keep a ref so that the `id` is
            # not used for another object.
            seen_exceptions.append(exc_value)
            seen_exception_ids.add(id(exc_value))

            if exc_value.__suppress_context__:
                cause = exc_value.__cause__
            else:
                cause = exc_value.__context__
            if cause is None:
                break
            exc_type = type(cause)
            exc_value = cause
            tb = getattr(cause, "__traceback__", None)

else:

    def walk_exception_chain(exc_info):
        # type: (ExcInfo) -> Iterator[ExcInfo]
        yield exc_info


def exceptions_from_error(
    exc_type,  # type: Optional[type]
    exc_value,  # type: Optional[BaseException]
    tb,  # type: Optional[TracebackType]
    client_options=None,  # type: Optional[Dict[str, Any]]
    mechanism=None,  # type: Optional[Dict[str, Any]]
    exception_id=0,  # type: int
    parent_id=0,  # type: int
    source=None,  # type: Optional[str]
):
    # type: (...) -> Tuple[int, List[Dict[str, Any]]]
    """
    Creates the list of exceptions.
    This can include chained exceptions and exceptions from an ExceptionGroup.

    See the Exception Interface documentation for more details:
    https://develop.sentry.dev/sdk/event-payloads/exception/
    """

    parent = single_exception_from_error_tuple(
        exc_type=exc_type,
        exc_value=exc_value,
        tb=tb,
        client_options=client_options,
        mechanism=mechanism,
        exception_id=exception_id,
        parent_id=parent_id,
        source=source,
    )
    exceptions = [parent]

    parent_id = exception_id
    exception_id += 1

    should_supress_context = hasattr(exc_value, "__suppress_context__") and exc_value.__suppress_context__  # type: ignore
    if should_supress_context:
        # Add direct cause.
        # The field `__cause__` is set when raised with the exception (using the `from` keyword).
        exception_has_cause = exc_value and hasattr(exc_value, "__cause__") and exc_value.__cause__ is not None
        if exception_has_cause:
            cause = exc_value.__cause__  # type: ignore
            (exception_id, child_exceptions) = exceptions_from_error(
                exc_type=type(cause),
                exc_value=cause,
                tb=getattr(cause, "__traceback__", None),
                client_options=client_options,
                mechanism=mechanism,
                exception_id=exception_id,
                source="__cause__",
            )
            exceptions.extend(child_exceptions)

    else:
        # Add indirect cause.
        # The field `__context__` is assigned if another exception occurs while handling the exception.
        exception_has_content = exc_value and hasattr(exc_value, "__context__") and exc_value.__context__ is not None
        if exception_has_content:
            context = exc_value.__context__  # type: ignore
            (exception_id, child_exceptions) = exceptions_from_error(
                exc_type=type(context),
                exc_value=context,
                tb=getattr(context, "__traceback__", None),
                client_options=client_options,
                mechanism=mechanism,
                exception_id=exception_id,
                source="__context__",
            )
            exceptions.extend(child_exceptions)

    # Add exceptions from an ExceptionGroup.
    is_exception_group = exc_value and hasattr(exc_value, "exceptions")
    if is_exception_group:
        for idx, e in enumerate(exc_value.exceptions):  # type: ignore
            (exception_id, child_exceptions) = exceptions_from_error(
                exc_type=type(e),
                exc_value=e,
                tb=getattr(e, "__traceback__", None),
                client_options=client_options,
                mechanism=mechanism,
                exception_id=exception_id,
                parent_id=parent_id,
                source="exceptions[%s]" % idx,
            )
            exceptions.extend(child_exceptions)

    return (exception_id, exceptions)


def exceptions_from_error_tuple(
    exc_info,  # type: ExcInfo
    client_options=None,  # type: Optional[Dict[str, Any]]
    mechanism=None,  # type: Optional[Dict[str, Any]]
):
    # type: (...) -> List[Dict[str, Any]]
    exc_type, exc_value, tb = exc_info

    is_exception_group = BaseExceptionGroup is not None and isinstance(exc_value, BaseExceptionGroup)

    if is_exception_group:
        (_, exceptions) = exceptions_from_error(
            exc_type=exc_type,
            exc_value=exc_value,
            tb=tb,
            client_options=client_options,
            mechanism=mechanism,
            exception_id=0,
            parent_id=0,
        )

    else:
        exceptions = []
        for exc_type, exc_value, tb in walk_exception_chain(exc_info):
            exceptions.append(single_exception_from_error_tuple(exc_type, exc_value, tb, client_options, mechanism))

    exceptions.reverse()

    return exceptions


def to_string(value):
    # type: (str) -> str
    try:
        return str(value)
    except UnicodeDecodeError:
        return repr(value)[1:-1]


def iter_event_stacktraces(event):
    # type: (Event) -> Iterator[Dict[str, Any]]
    if "stacktrace" in event:
        yield event["stacktrace"]
    if "threads" in event:
        for thread in event["threads"].get("values") or ():
            if "stacktrace" in thread:
                yield thread["stacktrace"]
    if "exception" in event:
        for exception in event["exception"].get("values") or ():
            if "stacktrace" in exception:
                yield exception["stacktrace"]


def iter_event_frames(event):
    # type: (Event) -> Iterator[Dict[str, Any]]
    for stacktrace in iter_event_stacktraces(event):
        for frame in stacktrace.get("frames") or ():
            yield frame


def handle_in_app(event, in_app_exclude=None, in_app_include=None, project_root=None):
    # type: (Event, Optional[List[str]], Optional[List[str]], Optional[str]) -> Event
    for stacktrace in iter_event_stacktraces(event):
        set_in_app_in_frames(
            stacktrace.get("frames"),
            in_app_exclude=in_app_exclude,
            in_app_include=in_app_include,
            project_root=project_root,
        )

    return event


def set_in_app_in_frames(frames, in_app_exclude, in_app_include, project_root=None):
    # type: (Any, Optional[List[str]], Optional[List[str]], Optional[str]) -> Optional[Any]
    if not frames:
        return None

    for frame in frames:
        # if frame has already been marked as in_app, skip it
        current_in_app = frame.get("in_app")
        if current_in_app is not None:
            continue

        module = frame.get("module")

        # check if module in frame is in the list of modules to include
        if _module_in_list(module, in_app_include):
            frame["in_app"] = True
            continue

        # check if module in frame is in the list of modules to exclude
        if _module_in_list(module, in_app_exclude):
            frame["in_app"] = False
            continue

        # if frame has no abs_path, skip further checks
        abs_path = frame.get("abs_path")
        if abs_path is None:
            continue

        if _is_external_source(abs_path):
            frame["in_app"] = False
            continue

        if _is_in_project_root(abs_path, project_root):
            frame["in_app"] = True
            continue

    return frames


def exc_info_from_error(error):
    # type: (Union[BaseException, ExcInfo]) -> ExcInfo
    if isinstance(error, tuple) and len(error) == 3:
        exc_type, exc_value, tb = error
    elif isinstance(error, BaseException):
        tb = getattr(error, "__traceback__", None)
        if tb is not None:
            exc_type = type(error)
            exc_value = error
        else:
            exc_type, exc_value, tb = sys.exc_info()
            if exc_value is not error:
                tb = None
                exc_value = error
                exc_type = type(error)

    else:
        raise ValueError("Expected Exception object to report, got %s!" % type(error))

    exc_info = (exc_type, exc_value, tb)

    if TYPE_CHECKING:
        # This cast is safe because exc_type and exc_value are either both
        # None or both not None.
        exc_info = cast(ExcInfo, exc_info)

    return exc_info


def event_from_exception(
    exc_info,  # type: Union[BaseException, ExcInfo]
    client_options=None,  # type: Optional[Dict[str, Any]]
    mechanism=None,  # type: Optional[Dict[str, Any]]
):
    # type: (...) -> Tuple[Event, Dict[str, Any]]
    exc_info = exc_info_from_error(exc_info)
    hint = event_hint_with_exc_info(exc_info)
    return (
        {
            "level": "error",
            "exception": {"values": exceptions_from_error_tuple(exc_info, client_options, mechanism)},
        },
        hint,
    )


def _module_in_list(name, items):
    # type: (str, Optional[List[str]]) -> bool
    if name is None:
        return False

    if not items:
        return False

    for item in items:
        if item == name or name.startswith(item + "."):
            return True

    return False


def _is_external_source(abs_path):
    # type: (str) -> bool
    # check if frame is in 'site-packages' or 'dist-packages'
    external_source = re.search(r"[\\/](?:dist|site)-packages[\\/]", abs_path) is not None
    return external_source


def _is_in_project_root(abs_path, project_root):
    # type: (str, Optional[str]) -> bool
    if project_root is None:
        return False

    # check if path is in the project root
    if abs_path.startswith(project_root):
        return True

    return False


def _truncate_by_bytes(string, max_bytes):
    # type: (str, int) -> str
    """
    Truncate a UTF-8-encodable string to the last full codepoint so that it fits in max_bytes.
    """
    truncated = string.encode("utf-8")[: max_bytes - 3].decode("utf-8", errors="ignore")

    return truncated + "..."


def _get_size_in_bytes(value):
    # type: (str) -> Optional[int]
    try:
        return len(value.encode("utf-8"))
    except (UnicodeEncodeError, UnicodeDecodeError):
        return None


def strip_string(value, max_length=None):
    # type: (str, Optional[int]) -> Union[AnnotatedValue, str]
    if not value:
        return value

    if max_length is None:
        max_length = DEFAULT_MAX_VALUE_LENGTH

    byte_size = _get_size_in_bytes(value)
    text_size = len(value)

    if byte_size is not None and byte_size > max_length:
        # truncate to max_length bytes, preserving code points
        truncated_value = _truncate_by_bytes(value, max_length)
    elif text_size is not None and text_size > max_length:
        # fallback to truncating by string length
        truncated_value = value[: max_length - 3] + "..."
    else:
        return value

    return AnnotatedValue(
        value=truncated_value,
        metadata={
            "len": byte_size or text_size,
            "rem": [["!limit", "x", max_length - 3, max_length]],
        },
    )
