sapling/eden/cli/logfile.py
Adam Simpkins 9bfb48c921 update license headers in .py files
Summary:
Update the copyright & license headers in Python files to reflect the
relicensing to GPLv2

Reviewed By: wez

Differential Revision: D15487088

fbshipit-source-id: 9f2138dff41048d2c35f15e09a04ae5a9c9c80dd
2019-06-19 17:02:46 -07:00

63 lines
1.7 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2.
import asyncio
import contextlib
import io
import pathlib
import typing
_BinaryIO = typing.Union[typing.IO[bytes], io.BufferedIOBase]
@contextlib.contextmanager
def forward_log_file(
file_path: pathlib.Path, output_file: _BinaryIO
) -> typing.Iterator["LogForwarder"]:
with follow_log_file(file_path) as follower:
yield LogForwarder(follower=follower, output_file=output_file)
class LogForwarder:
__follower: "LogFollower"
__output_file: _BinaryIO
def __init__(self, follower: "LogFollower", output_file: _BinaryIO) -> None:
super().__init__()
self.__follower = follower
self.__output_file = output_file
def poll(self) -> None:
# pyre-fixme[29]: `Union[Callable[[Union[bytearray, bytes]], int],
# Callable[[bytes], int]]` is not a function.
self.__output_file.write(self.__follower.poll())
self.__output_file.flush()
async def poll_forever_async(self) -> typing.NoReturn:
while True:
self.poll()
await asyncio.sleep(0.1)
@contextlib.contextmanager
def follow_log_file(file_path: pathlib.Path) -> typing.Iterator["LogFollower"]:
with open(file_path, "rb") as file:
yield LogFollower(file)
class LogFollower:
__file: _BinaryIO
def __init__(self, file: _BinaryIO) -> None:
super().__init__()
self.__file = file
def poll(self) -> bytes:
# pyre-fixme[29]: `Union[Callable[[Optional[int]], bytes], Callable[[int],
# bytes]]` is not a function.
return self.__file.read()