""":module: watchdog.observers.fsevents
:synopsis: FSEvents based emitter implementation.
:author: yesudeep@google.com (Yesudeep Mangalapilly)
:author: contact@tiger-222.fr (Mickaël Schoentgen)
:platforms: macOS
"""

from __future__ import annotations

import logging
import os
import threading
import time
import unicodedata
from typing import TYPE_CHECKING

import _watchdog_fsevents as _fsevents

from watchdog.events import (
    DirCreatedEvent,
    DirDeletedEvent,
    DirModifiedEvent,
    DirMovedEvent,
    FileCreatedEvent,
    FileDeletedEvent,
    FileModifiedEvent,
    FileMovedEvent,
    generate_sub_created_events,
    generate_sub_moved_events,
)
from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter
from watchdog.utils.dirsnapshot import DirectorySnapshot

if TYPE_CHECKING:
    from watchdog.events import FileSystemEvent, FileSystemEventHandler
    from watchdog.observers.api import EventQueue, ObservedWatch


logger = logging.getLogger("fsevents")


class FSEventsEmitter(EventEmitter):
    """macOS FSEvents Emitter class.

    :param event_queue:
        The event queue to fill with events.
    :param watch:
        A watch object representing the directory to monitor.
    :type watch:
        :class:`watchdog.observers.api.ObservedWatch`
    :param timeout:
        Read events blocking timeout (in seconds).
    :param event_filter:
        Collection of event types to emit, or None for no filtering (default).
    :param suppress_history:
        The FSEvents API may emit historic events up to 30 sec before the watch was
        started. When ``suppress_history`` is ``True``, those events will be suppressed
        by creating a directory snapshot of the watched path before starting the stream
        as a reference to suppress old events. Warning: This may result in significant
        memory usage in case of a large number of items in the watched path.
    :type timeout:
        ``float``
    """

    def __init__(
        self,
        event_queue: EventQueue,
        watch: ObservedWatch,
        *,
        timeout: float = DEFAULT_EMITTER_TIMEOUT,
        event_filter: list[type[FileSystemEvent]] | None = None,
        suppress_history: bool = False,
    ) -> None:
        super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
        self._fs_view: set[int] = set()
        self.suppress_history = suppress_history
        self._start_time = 0.0
        self._starting_state: DirectorySnapshot | None = None
        self._lock = threading.Lock()
        self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path)))

    def on_thread_stop(self) -> None:
        _fsevents.remove_watch(self.watch)
        _fsevents.stop(self)

    def queue_event(self, event: FileSystemEvent) -> None:
        # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop
        # all the events here which do not have a src_path / dest_path that matches the watched path
        if self._watch.is_recursive or not self._is_recursive_event(event):
            logger.debug("queue_event %s", event)
            EventEmitter.queue_event(self, event)
        else:
            logger.debug("drop event %s", event)

    def _is_recursive_event(self, event: FileSystemEvent) -> bool:
        src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path)
        if src_path == self._absolute_watch_path:
            return False

        if isinstance(event, (FileMovedEvent, DirMovedEvent)):
            # when moving something into the watch path we must always take the dirname,
            # otherwise we miss out on `DirMovedEvent`s
            dest_path = os.path.dirname(event.dest_path)
            if dest_path == self._absolute_watch_path:
                return False

        return True

    def _queue_created_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
        cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
        self.queue_event(cls(src_path))
        self.queue_event(DirModifiedEvent(dirname))

    def _queue_deleted_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
        cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
        self.queue_event(cls(src_path))
        self.queue_event(DirModifiedEvent(dirname))

    def _queue_modified_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
        cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
        self.queue_event(cls(src_path))

    def _queue_renamed_event(
        self,
        src_event: FileSystemEvent,
        src_path: bytes | str,
        dst_path: bytes | str,
        src_dirname: bytes | str,
        dst_dirname: bytes | str,
    ) -> None:
        cls = DirMovedEvent if src_event.is_directory else FileMovedEvent
        dst_path = self._encode_path(dst_path)
        self.queue_event(cls(src_path, dst_path))
        self.queue_event(DirModifiedEvent(src_dirname))
        self.queue_event(DirModifiedEvent(dst_dirname))

    def _is_historic_created_event(self, event: _fsevents.NativeEvent) -> bool:
        # We only queue a created event if the item was created after we
        # started the FSEventsStream.

        in_history = event.inode in self._fs_view

        if self._starting_state:
            try:
                old_inode = self._starting_state.inode(event.path)[0]
                before_start = old_inode == event.inode
            except KeyError:
                before_start = False
        else:
            before_start = False

        return in_history or before_start

    @staticmethod
    def _is_meta_mod(event: _fsevents.NativeEvent) -> bool:
        """Returns True if the event indicates a change in metadata."""
        return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change

    def queue_events(self, timeout: float, events: list[_fsevents.NativeEvent]) -> None:  # type: ignore[override]
        if logger.getEffectiveLevel() <= logging.DEBUG:
            for event in events:
                flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True)
                logger.debug("%s: %s", event, flags)

        if time.monotonic() - self._start_time > 60:
            # Event history is no longer needed, let's free some memory.
            self._starting_state = None

        while events:
            event = events.pop(0)

            src_path = self._encode_path(event.path)
            src_dirname = os.path.dirname(src_path)

            try:
                stat = os.stat(src_path)
            except OSError:
                stat = None

            exists = stat and stat.st_ino == event.inode

            # FSevents may coalesce multiple events for the same item + path into a
            # single event. However, events are never coalesced for different items at
            # the same path or for the same item at different paths. Therefore, the
            # event chains "removed -> created" and "created -> renamed -> removed" will
            # never emit a single native event and a deleted event *always* means that
            # the item no longer existed at the end of the event chain.

            # Some events will have a spurious `is_created` flag set, coalesced from an
            # already emitted and processed CreatedEvent. To filter those, we keep track
            # of all inodes which we know to be already created. This is safer than
            # keeping track of paths since paths are more likely to be reused than
            # inodes.

            # Likewise, some events will have a spurious `is_modified`,
            # `is_inode_meta_mod` or `is_xattr_mod` flag set. We currently do not
            # suppress those but could do so if the item still exists by caching the
            # stat result and verifying that it did change.

            if event.is_created and event.is_removed:
                # Events will only be coalesced for the same item / inode.
                # The sequence deleted -> created therefore cannot occur.
                # Any combination with renamed cannot occur either.

                if not self._is_historic_created_event(event):
                    self._queue_created_event(event, src_path, src_dirname)

                self._fs_view.add(event.inode)

                if event.is_modified or self._is_meta_mod(event):
                    self._queue_modified_event(event, src_path, src_dirname)

                self._queue_deleted_event(event, src_path, src_dirname)
                self._fs_view.discard(event.inode)

            else:
                if event.is_created and not self._is_historic_created_event(event):
                    self._queue_created_event(event, src_path, src_dirname)

                self._fs_view.add(event.inode)

                if event.is_modified or self._is_meta_mod(event):
                    self._queue_modified_event(event, src_path, src_dirname)

                if event.is_renamed:
                    # Check if we have a corresponding destination event in the watched path.
                    dst_event = next(
                        iter(e for e in events if e.is_renamed and e.inode == event.inode),
                        None,
                    )

                    if dst_event:
                        # Item was moved within the watched folder.
                        logger.debug("Destination event for rename is %s", dst_event)

                        dst_path = self._encode_path(dst_event.path)
                        dst_dirname = os.path.dirname(dst_path)

                        self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname)
                        self._fs_view.add(event.inode)

                        for sub_moved_event in generate_sub_moved_events(src_path, dst_path):
                            self.queue_event(sub_moved_event)

                        # Process any coalesced flags for the dst_event.

                        events.remove(dst_event)

                        if dst_event.is_modified or self._is_meta_mod(dst_event):
                            self._queue_modified_event(dst_event, dst_path, dst_dirname)

                        if dst_event.is_removed:
                            self._queue_deleted_event(dst_event, dst_path, dst_dirname)
                            self._fs_view.discard(dst_event.inode)

                    elif exists:
                        # This is the destination event, item was moved into the watched
                        # folder.
                        self._queue_created_event(event, src_path, src_dirname)
                        self._fs_view.add(event.inode)

                        for sub_created_event in generate_sub_created_events(src_path):
                            self.queue_event(sub_created_event)

                    else:
                        # This is the source event, item was moved out of the watched
                        # folder.
                        self._queue_deleted_event(event, src_path, src_dirname)
                        self._fs_view.discard(event.inode)

                        # Skip further coalesced processing.
                        continue

                if event.is_removed:
                    # Won't occur together with renamed.
                    self._queue_deleted_event(event, src_path, src_dirname)
                    self._fs_view.discard(event.inode)

            if event.is_root_changed:
                # This will be set if root or any of its parents is renamed or deleted.
                # TODO: find out new path and generate DirMovedEvent?
                self.queue_event(DirDeletedEvent(self.watch.path))
                logger.debug("Stopping because root path was changed")
                self.stop()

                self._fs_view.clear()

    def events_callback(self, paths: list[bytes], inodes: list[int], flags: list[int], ids: list[int]) -> None:
        """Callback passed to FSEventStreamCreate(), it will receive all
        FS events and queue them.
        """
        cls = _fsevents.NativeEvent
        try:
            events = [
                cls(path, inode, event_flags, event_id)
                for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids)
            ]
            with self._lock:
                self.queue_events(self.timeout, events)
        except Exception:
            logger.exception("Unhandled exception in fsevents callback")

    def run(self) -> None:
        self.pathnames = [self.watch.path]
        self._start_time = time.monotonic()
        try:
            _fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames)
            _fsevents.read_events(self)
        except Exception:
            logger.exception("Unhandled exception in FSEventsEmitter")

    def on_thread_start(self) -> None:
        if self.suppress_history:
            watch_path = os.fsdecode(self.watch.path) if isinstance(self.watch.path, bytes) else self.watch.path
            self._starting_state = DirectorySnapshot(watch_path)

    def _encode_path(self, path: bytes | str) -> bytes | str:
        """Encode path only if bytes were passed to this emitter."""
        return os.fsencode(path) if isinstance(self.watch.path, bytes) else path


class FSEventsObserver(BaseObserver):
    def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
        super().__init__(FSEventsEmitter, timeout=timeout)

    def schedule(
        self,
        event_handler: FileSystemEventHandler,
        path: str,
        *,
        recursive: bool = False,
        event_filter: list[type[FileSystemEvent]] | None = None,
    ) -> ObservedWatch:
        # Fix for issue #26: Trace/BPT error when given a unicode path
        # string. https://github.com/gorakhargosh/watchdog/issues#issue/26
        if isinstance(path, str):
            path = unicodedata.normalize("NFC", path)

        return super().schedule(event_handler, path, recursive=recursive, event_filter=event_filter)
