mirror of
https://github.com/facebook/sapling.git
synced 2024-10-06 06:47:41 +03:00
8996dbe0a8
Summary: This diff adds a basic integration test for `eden debug subscribe` command. The reason it uses async is so we can set a timeout on how long we should wait for new messages from stdout. (The alternative is to use `select.select`, which does not work on Windows). Reviewed By: MichaelCuevas Differential Revision: D40453344 fbshipit-source-id: 1116b9a7c3ec7ef82b41df198bca5dec24639b48
120 lines
3.8 KiB
Python
120 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
#
|
|
# This software may be used and distributed according to the terms of the
|
|
# GNU General Public License version 2.
|
|
|
|
import asyncio
|
|
import json
|
|
import platform
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import typing as t
|
|
|
|
from .lib import testcase
|
|
|
|
|
|
@testcase.eden_repo_test
|
|
class DebugSubscribeTest(testcase.EdenRepoTest):
|
|
def populate_repo(self) -> None:
|
|
self.repo.write_file("hello", "hola\n")
|
|
self.repo.commit("Initial commit.")
|
|
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
if platform.system() == "Windows" and not getattr(self, "loop", None):
|
|
# This is required on Windows
|
|
# pyre-ignore[16]: Windows only
|
|
self.loop = asyncio.ProactorEventLoop()
|
|
asyncio.set_event_loop(self.loop)
|
|
|
|
def tearDown(self) -> None:
|
|
if getattr(self, "loop", None):
|
|
self.loop.close()
|
|
super().tearDown()
|
|
|
|
async def subscribe(self) -> asyncio.subprocess.Process:
|
|
cmd, env = self.eden.get_edenfsctl_cmd_env(
|
|
"debug",
|
|
"subscribe",
|
|
self.mount,
|
|
"--throttle",
|
|
"5",
|
|
"--guard",
|
|
"1",
|
|
)
|
|
env["EDENFS_LOG"] = "edenfs=trace"
|
|
|
|
return await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
env=env,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
cwd=self.mount,
|
|
)
|
|
|
|
async def next_event(
|
|
self, sub: asyncio.subprocess.Process, timeout: int = 5
|
|
) -> t.Optional[t.Dict[str, t.Any]]:
|
|
"""Wait for the next event generated from the subscription.
|
|
|
|
We need asyncio here so we can set a timeout. Otherwise we will need to
|
|
manually select.
|
|
"""
|
|
stdout = sub.stdout
|
|
if not stdout:
|
|
raise RuntimeError("no stdout captured")
|
|
|
|
try:
|
|
line = await asyncio.wait_for(stdout.readline(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
return None
|
|
if not line:
|
|
return None
|
|
try:
|
|
return json.loads(line.decode())
|
|
except json.JSONDecodeError:
|
|
print(f"failed to decode: {line}", file=sys.stderr)
|
|
raise
|
|
|
|
async def wait_for_next_event(
|
|
self,
|
|
sub: asyncio.subprocess.Process,
|
|
action: t.Callable[[], None] = lambda: None,
|
|
attempt: int = 10,
|
|
) -> t.Optional[t.Dict[str, t.Any]]:
|
|
"""Pull until the next event generated from the subscription
|
|
|
|
When the system is under heavy load or when we are testing dev mode
|
|
binary, the subscription may finish initializing after the write event
|
|
is already done by the test. So we retry a few times to make sure the
|
|
subscription sees it.
|
|
"""
|
|
for _ in range(attempt):
|
|
time.sleep(0.1)
|
|
action()
|
|
event = await self.next_event(sub)
|
|
if event:
|
|
return event
|
|
return None
|
|
|
|
async def test_debug_subscribe(self) -> None:
|
|
subscription = await self.subscribe()
|
|
|
|
event = await self.wait_for_next_event(subscription)
|
|
self.assertIsNotNone(event)
|
|
self.assertIsNotNone(event["result"])
|
|
self.assertIsNotNone(event["result"]["message"])
|
|
self.assertTrue(event["result"]["message"].startswith("subscribed to"))
|
|
|
|
event = await self.wait_for_next_event(
|
|
subscription, lambda: self.write_file("hello", "test")
|
|
)
|
|
self.assertIsNotNone(event)
|
|
self.assertIsNotNone(event["result"])
|
|
self.assertIsNotNone(event["result"]["mount_generation"])
|
|
self.assertIsNotNone(event["result"]["sequence_number"])
|
|
self.assertIsNotNone(event["result"]["snapshot_hash"])
|