Source code for outputfile

#
# MIT License
#
# Copyright (c) 2023 nbiotcloud
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
"""
Timestamp Preserving Output File Writer.

``outputfile.open_`` behaves identical to ``open(..., mode="w")``:

>>> from outputfile import open_
>>> from pathlib import Path
>>> filepath = Path('file.txt')
>>> with open_(filepath) as file:
...     file.write("foo")

but the timestamp stays the same, if the file content did not change:

>>> mtime = filepath.stat().st_mtime
>>> with open_(filepath) as file:
...     file.write("foo")
>>> mtime - filepath.stat().st_mtime
0.0


The ``state`` attribute details the file handling status:

>>> otherpath = Path('other.txt')
>>> # first write
>>> with open_(otherpath) as file:
...     file.write("foo")
>>> file.state.name
'CREATED'
>>> # same write
>>> with open_(otherpath) as file:
...     file.write("foo")
>>> file.state.name
'IDENTICAL'
>>> # other write
>>> with open_(otherpath) as file:
...     file.write("bar")
>>> file.state.name
'UPDATED'

The argument ``existing`` defines the update strategy and can ``Existing.KEEP`` ...

>>> keep = Path('keep.txt')
>>> # first write
>>> with open_(keep, existing=Existing.KEEP) as file:
...     file.write("foo")
>>> file.state.name
'CREATED'
>>> # same write
>>> with open_(keep, existing=Existing.KEEP) as file:
...     file.write("foo")
>>> file.state.name
'EXISTING'
>>> # other write
>>> with open_(keep, existing=Existing.KEEP) as file:
...     file.write("bar")
>>> file.state.name
'EXISTING'

... or ``Existing.OVERWRITE``

>>> overwrite = Path('overwrite.txt')
>>> # first write
>>> with open_(overwrite, existing=Existing.OVERWRITE) as file:
...     file.write("foo")
>>> file.state.name
'CREATED'
>>> # same write
>>> with open_(overwrite, existing=Existing.OVERWRITE) as file:
...     file.write("foo")
>>> file.state.name
'OVERWRITTEN'
>>> # other write
>>> with open_(overwrite, existing=Existing.OVERWRITE) as file:
...     file.write("bar")
>>> file.state.name
'OVERWRITTEN'
"""
import difflib
import filecmp
import tempfile
from enum import Enum
from os import fdopen as _fdopen
from pathlib import Path
from shutil import copyfile
from typing import Union

__all__ = ["Existing", "State", "open_", "OutputFile"]


[docs]class Existing(Enum): """Strategy for Handling of existing files.""" ERROR = "error" KEEP = "keep" OVERWRITE = "overwrite" KEEP_TIMESTAMP = "keep_timestamp"
[docs]class State(Enum): """File state.""" OPEN = "OPEN" UPDATED = "UPDATED." IDENTICAL = "identical. untouched." CREATED = "CREATED." OVERWRITTEN = "OVERWRITTEN." EXISTING = "existing. SKIPPED." FAILED = "FAILED."
[docs]def open_( filepath: Union[Path, str], existing: Union[Existing, str] = Existing.KEEP_TIMESTAMP, mkdir: bool = False, diffout=None, **kwargs, ): """ Return an output file handle, whose timestamp is only updated on content change. By default, the filesystem timestamp of a written file is always updated on write, also when the final file content is identical to the overwritten version. The :any:`OutputFile` class works around this rule. The :any:`OutputFile` class behaves like a normal file in write ('w') mode, but the output is written to a temporary file. On :any:`close()` the temporary file and the target file are compared. If both files are identical, the temporary file is removed. If they differ, the temporary file is moved to the target file location. Args: filepath (str): Path to the target file. Keyword Args: existing (Existing, str): Handling of existing output files: * :any:`Existing.ERROR`: raise an ``FileExistsError``open` if the file exists already. * :any:`Existing.KEEP`: continue, without modifying the existing file. * :any:`Existing.OVERWRITE`: always overwrite the output file, like python's :any:`open` would do. * :any:`Existing.KEEP_TIMESTAMP`: write to temporary file and move to target file if content differs. mkdir (bool): create the output directory if it not exists. diffout: function receiving file diff on update. Raises: FileExistsError: if `existing="error"` and file exists already. Any keyword argument is simply bypassed to the "open" function, except "mode", which is forced to "w". """ return OutputFile(filepath, existing=existing, mkdir=mkdir, diffout=diffout, kwargs=kwargs)
[docs]class OutputFile: """File Object Wrapper.""" # pylint: disable=too-many-arguments,too-many-instance-attributes def __init__( self, filepath: Union[Path, str], existing: Union[Existing, str] = Existing.KEEP_TIMESTAMP, mkdir: bool = False, diffout=None, kwargs=None, ) -> None: """File object returned by :any:`open_`.""" super().__init__() if isinstance(filepath, str): filepath = Path(filepath) if isinstance(existing, str): existing = Existing(existing) self.filepath = filepath self.existing = existing self.mkdir = mkdir self.diffout = diffout self.__handle = None self.__open_state = False self.__state = None self.__file_exists = filepath.exists() self.__tmp_filepath = None self.__open(kwargs or {}) @property def state(self): """State.""" return self.__state
[docs] def write(self, *args, **kwargs): """ Write to file. See :py:meth:`io.TextIOBase.write` for reference. Returns: None Raises: ValueError: when the file is already closed. """ if self.__handle: self.__handle.write(*args, **kwargs) elif not self.__open_state: raise ValueError("I/O Error. Write on closed file.")
[docs] def close(self): """Close the file.""" self.__close()
[docs] def flush(self): """Flush file content.""" if self.__handle: self.__handle.flush()
@property def closed(self): """True, when the file has been closed and is not writable anymore.""" return not self.__open_state def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: self.__state = State.FAILED self.__close() def __open(self, opts): opts.setdefault("encoding", "utf-8") filepath = self.filepath existing = self.existing # Do not overwrite if self.__file_exists and existing == Existing.ERROR: raise FileExistsError(filepath) # Parent Directory filedir = filepath.parent if not filedir.exists(): if self.mkdir: filedir.mkdir(parents=True, exist_ok=True) else: raise FileNotFoundError(f"Output directory '{filedir!s}' does not exists.") # open if existing == Existing.KEEP_TIMESTAMP: file, tmp_filepath = tempfile.mkstemp() self.__tmp_filepath = Path(tmp_filepath) self.__handle = _fdopen(file, "w", **opts) elif not self.__file_exists or existing != Existing.KEEP: # pylint: disable=consider-using-with,unspecified-encoding self.__handle = open(filepath, "w", **opts) self.__open_state = True self.__state = State.OPEN def __close(self): # pylint: disable=too-many-branches if self.__open_state: diff = None if self.existing == Existing.KEEP_TIMESTAMP: self.__handle.flush() self.__handle.close() if self.__state != State.FAILED: is_modified = _is_modified(self.filepath, self.__tmp_filepath) if self.diffout and is_modified is True: diff = _get_diff(self.filepath, self.__tmp_filepath) if is_modified is not False: copyfile(self.__tmp_filepath, self.filepath) self.__state = { True: State.UPDATED, False: State.IDENTICAL, None: State.CREATED, }[is_modified] self.__tmp_filepath.unlink() self.__tmp_filepath = None elif self.__handle: self.__handle.flush() self.__handle.close() if self.__state != State.FAILED: # pragma: no cover if self.__file_exists: self.__state = State.OVERWRITTEN else: self.__state = State.CREATED else: self.__state = State.EXISTING if self.diffout and diff: self.diffout(diff) self.__handle = None self.__open_state = False
def _is_modified(path0, path1): if not path0.exists() or not path1.exists(): return None return not filecmp.cmp(path0, path1, shallow=False) def _get_diff(filepath0, filepath1): with open(filepath0, encoding="utf-8") as handle0: with open(filepath1, encoding="utf-8") as handle1: content0 = handle0.readlines() content1 = handle1.readlines() diff = difflib.unified_diff(content0, content1) return "".join(list(diff))