sapling/eden/fs/cli/logfile.py
Xavier Deguillard a29d465ee8 fs: fix license header
Summary:
With Facebook having been renamed Meta Platforms, we need to change the license
headers.

Reviewed By: fanzeyi

Differential Revision: D33407812

fbshipit-source-id: b11bfbbf13a48873f0cea75f212cc7b07a68fb2e
2022-01-04 15:00:07 -08:00

61 lines
1.5 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2.
# pyre-strict
import asyncio
import contextlib
import io
import pathlib
import typing
_BinaryIO = typing.Union[typing.IO[bytes], io.BufferedIOBase]
@contextlib.contextmanager
def forward_log_file(
file_path: pathlib.Path, output_file: _BinaryIO
) -> typing.Iterator["LogForwarder"]:
with follow_log_file(file_path) as follower:
yield LogForwarder(follower=follower, output_file=output_file)
class LogForwarder:
__follower: "LogFollower"
__output_file: _BinaryIO
def __init__(self, follower: "LogFollower", output_file: _BinaryIO) -> None:
super().__init__()
self.__follower = follower
self.__output_file = output_file
def poll(self) -> None:
self.__output_file.write(self.__follower.poll())
self.__output_file.flush()
async def poll_forever_async(self) -> typing.NoReturn:
while True:
self.poll()
await asyncio.sleep(0.1)
@contextlib.contextmanager
def follow_log_file(file_path: pathlib.Path) -> typing.Iterator["LogFollower"]:
with open(file_path, "rb") as file:
yield LogFollower(file)
class LogFollower:
__file: _BinaryIO
def __init__(self, file: _BinaryIO) -> None:
super().__init__()
self.__file = file
def poll(self) -> bytes:
return self.__file.read()