mirror of
https://github.com/facebook/sapling.git
synced 2024-10-08 07:49:11 +03:00
cli: strictify all of the cli tests
Summary: While I was fixing the cli_test I realized that the rest of the tests didn't enforce strict type checking, let's make sure it is enabled so `pyre -l eden` works for all the tests now and in the future. Reviewed By: chadaustin Differential Revision: D26356267 fbshipit-source-id: 4f026b6f96c410115a6a38d772f8e06ab977293b
This commit is contained in:
parent
ddb7859ffd
commit
7a0dec91ec
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import configparser
|
import configparser
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
@ -22,14 +24,14 @@ from ..configinterpolator import EdenConfigInterpolator
|
|||||||
from ..configutil import EdenConfigParser, UnexpectedType
|
from ..configutil import EdenConfigParser, UnexpectedType
|
||||||
|
|
||||||
|
|
||||||
def get_toml_test_file_invalid():
|
def get_toml_test_file_invalid() -> str:
|
||||||
cfg_file = """
|
cfg_file = """
|
||||||
[core thisIsNotAllowed]
|
[core thisIsNotAllowed]
|
||||||
"""
|
"""
|
||||||
return cfg_file
|
return cfg_file
|
||||||
|
|
||||||
|
|
||||||
def get_toml_test_file_defaults():
|
def get_toml_test_file_defaults() -> str:
|
||||||
cfg_file = """
|
cfg_file = """
|
||||||
[core]
|
[core]
|
||||||
systemIgnoreFile = "/etc/eden/gitignore"
|
systemIgnoreFile = "/etc/eden/gitignore"
|
||||||
@ -44,7 +46,7 @@ reporter = 'pastry --title "eden rage from $(hostname)"'
|
|||||||
return cfg_file
|
return cfg_file
|
||||||
|
|
||||||
|
|
||||||
def get_toml_test_file_user_rc():
|
def get_toml_test_file_user_rc() -> str:
|
||||||
cfg_file = """
|
cfg_file = """
|
||||||
[core]
|
[core]
|
||||||
ignoreFile = "/home/${USER}/.gitignore-override"
|
ignoreFile = "/home/${USER}/.gitignore-override"
|
||||||
@ -56,7 +58,7 @@ scribe-cat = "/usr/local/bin/scribe_cat"
|
|||||||
return cfg_file
|
return cfg_file
|
||||||
|
|
||||||
|
|
||||||
def get_toml_test_file_system_rc():
|
def get_toml_test_file_system_rc() -> str:
|
||||||
cfg_file = """
|
cfg_file = """
|
||||||
["telemetry"]
|
["telemetry"]
|
||||||
scribe-cat = "/bad/path/to/scribe_cat"
|
scribe-cat = "/bad/path/to/scribe_cat"
|
||||||
|
@ -4,75 +4,85 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
from typing import List, NamedTuple, Tuple, Dict, Union
|
||||||
|
|
||||||
from eden.fs.cli.debug import FileStatsCMD
|
from eden.fs.cli.debug import FileStatsCMD
|
||||||
|
|
||||||
|
|
||||||
class DebugTest(unittest.TestCase):
|
class DebugTest(unittest.TestCase):
|
||||||
def test_get_largest_directories_by_count(self):
|
def test_get_largest_directories_by_count(self) -> None:
|
||||||
|
class TestCase(NamedTuple):
|
||||||
|
test_input: Tuple[List[Tuple[str, int]], int]
|
||||||
|
test_output: List[Dict[str, Union[int, str]]]
|
||||||
|
msg: str
|
||||||
|
|
||||||
test_cases = [
|
test_cases: List[TestCase] = [
|
||||||
{
|
TestCase(
|
||||||
"input": ([], 0),
|
test_input=([], 0),
|
||||||
"output": [{"path": ".", "file_count": 0}],
|
test_output=[{"path": ".", "file_count": 0}],
|
||||||
"msg": "empty directory with no minimum",
|
msg="empty directory with minimum of 1",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([], 1),
|
test_input=([], 1),
|
||||||
"output": [],
|
test_output=[],
|
||||||
"msg": "empty directory with minimum of 1",
|
msg="empty directory with minimum of 1",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([("dirA/filename", 1000)], 1),
|
test_input=([("dirA/filename", 1000)], 1),
|
||||||
"output": [
|
test_output=[
|
||||||
{"path": ".", "file_count": 1},
|
{"path": ".", "file_count": 1},
|
||||||
{"path": "dirA", "file_count": 1},
|
{"path": "dirA", "file_count": 1},
|
||||||
],
|
],
|
||||||
"msg": "single file with minimum of 1",
|
msg="single file with minimum of 1",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([("dirA/filename", 1000)], 2),
|
test_input=([("dirA/filename", 1000)], 2),
|
||||||
"output": [],
|
test_output=[],
|
||||||
"msg": "single file with minimum of 2",
|
msg="single file with minimum of 2",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([("dirA/filename", 1000), ("dirB/filename2", 50)], 1),
|
test_input=([("dirA/filename", 1000), ("dirB/filename2", 50)], 1),
|
||||||
"output": [
|
test_output=[
|
||||||
{"path": ".", "file_count": 2},
|
{"path": ".", "file_count": 2},
|
||||||
{"path": "dirA", "file_count": 1},
|
{"path": "dirA", "file_count": 1},
|
||||||
{"path": "dirB", "file_count": 1},
|
{"path": "dirB", "file_count": 1},
|
||||||
],
|
],
|
||||||
"msg": "two files with minimum of 1",
|
msg="two files with minimum of 1",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([("dirA/filename", 1000), ("dirB/filename2", 50)], 2),
|
test_input=([("dirA/filename", 1000), ("dirB/filename2", 50)], 2),
|
||||||
"output": [{"path": ".", "file_count": 2}],
|
test_output=[{"path": ".", "file_count": 2}],
|
||||||
"msg": "two files with minimum of 2",
|
msg="two files with minimum of 2",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([("filename", 1000), ("dirA/filename2", 50)], 1),
|
test_input=([("filename", 1000), ("dirA/filename2", 50)], 1),
|
||||||
"output": [
|
test_output=[
|
||||||
{"path": ".", "file_count": 2},
|
{"path": ".", "file_count": 2},
|
||||||
{"path": "dirA", "file_count": 1},
|
{"path": "dirA", "file_count": 1},
|
||||||
],
|
],
|
||||||
"msg": "file in root dir",
|
msg="file in root dir",
|
||||||
},
|
),
|
||||||
{
|
TestCase(
|
||||||
"input": ([("filename", 1000), ("dirA/dirB/dirC/filename2", 50)], 1),
|
test_input=([("filename", 1000), ("dirA/dirB/dirC/filename2", 50)], 1),
|
||||||
"output": [
|
test_output=[
|
||||||
{"path": ".", "file_count": 2},
|
{"path": ".", "file_count": 2},
|
||||||
{"path": "dirA", "file_count": 1},
|
{"path": "dirA", "file_count": 1},
|
||||||
{"path": "dirA/dirB", "file_count": 1},
|
{"path": "dirA/dirB", "file_count": 1},
|
||||||
{"path": "dirA/dirB/dirC", "file_count": 1},
|
{"path": "dirA/dirB/dirC", "file_count": 1},
|
||||||
],
|
],
|
||||||
"msg": "deeply nested file",
|
msg="deeply nested file",
|
||||||
},
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
for test_case in test_cases:
|
for test_case in test_cases:
|
||||||
|
path_and_sizes, min_file_count = test_case.test_input
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
FileStatsCMD.get_largest_directories_by_count(*test_case["input"]),
|
FileStatsCMD.get_largest_directories_by_count(
|
||||||
test_case["output"],
|
path_and_sizes, min_file_count
|
||||||
test_case["msg"],
|
),
|
||||||
|
test_case.test_output,
|
||||||
|
test_case.msg,
|
||||||
)
|
)
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import configparser
|
import configparser
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
@ -11,7 +13,7 @@ from .. import configinterpolator
|
|||||||
|
|
||||||
|
|
||||||
class InterpolatorTest(unittest.TestCase):
|
class InterpolatorTest(unittest.TestCase):
|
||||||
def test_basic_subs(self):
|
def test_basic_subs(self) -> None:
|
||||||
defaults = {"USER": "wez", "RECURSIVE": "a${RECURSIVE}b"}
|
defaults = {"USER": "wez", "RECURSIVE": "a${RECURSIVE}b"}
|
||||||
parser = configparser.ConfigParser(
|
parser = configparser.ConfigParser(
|
||||||
interpolation=configinterpolator.EdenConfigInterpolator(defaults)
|
interpolation=configinterpolator.EdenConfigInterpolator(defaults)
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
# pyre_strict
|
# pyre-strict
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import errno
|
import errno
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import io
|
import io
|
||||||
|
|
||||||
import eden.fs.cli.ui
|
import eden.fs.cli.ui
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
@ -4,17 +4,20 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from eden.fs.cli.mtab import MountInfo, parse_macos_mount_output, parse_mtab
|
from eden.fs.cli.mtab import MountInfo, parse_macos_mount_output, parse_mtab
|
||||||
|
|
||||||
|
|
||||||
class MTabTest(unittest.TestCase):
|
class MTabTest(unittest.TestCase):
|
||||||
# The diffs for what is written to stdout can be large.
|
# The diffs for what is written to stdout can be large.
|
||||||
maxDiff = None
|
maxDiff: Optional[int] = None
|
||||||
|
|
||||||
def test_parse_mtab(self):
|
def test_parse_mtab(self) -> None:
|
||||||
contents = """\
|
contents = b"""\
|
||||||
homedir.eden.com:/home109/chadaustin/public_html /mnt/public/chadaustin nfs rw,context=user_u:object_r:user_home_dir_t,relatime,vers=3,rsize=65536,wsize=65536,namlen=255,soft,nosharecache,proto=tcp6,timeo=100,retrans=2,sec=krb5i,mountaddr=2401:db00:fffe:1007:face:0000:0:4007,mountvers=3,mountport=635,mountproto=udp6,local_lock=none,addr=2401:db00:fffe:1007:0000:b00c:0:4007 0 0
|
homedir.eden.com:/home109/chadaustin/public_html /mnt/public/chadaustin nfs rw,context=user_u:object_r:user_home_dir_t,relatime,vers=3,rsize=65536,wsize=65536,namlen=255,soft,nosharecache,proto=tcp6,timeo=100,retrans=2,sec=krb5i,mountaddr=2401:db00:fffe:1007:face:0000:0:4007,mountvers=3,mountport=635,mountproto=udp6,local_lock=none,addr=2401:db00:fffe:1007:0000:b00c:0:4007 0 0
|
||||||
squashfuse_ll /mnt/xarfuse/uid-0/2c071047-ns-4026531840 fuse.squashfuse_ll rw,nosuid,nodev,relatime,user_id=0,group_id=0 0 0
|
squashfuse_ll /mnt/xarfuse/uid-0/2c071047-ns-4026531840 fuse.squashfuse_ll rw,nosuid,nodev,relatime,user_id=0,group_id=0 0 0
|
||||||
bogus line here
|
bogus line here
|
||||||
@ -23,11 +26,11 @@ edenfs: /tmp/eden_test.4rec6drf/mounts/main fuse rw,nosuid,relatime,user_id=1386
|
|||||||
mount_infos = parse_mtab(contents)
|
mount_infos = parse_mtab(contents)
|
||||||
self.assertEqual(3, len(mount_infos))
|
self.assertEqual(3, len(mount_infos))
|
||||||
one, two, three = mount_infos
|
one, two, three = mount_infos
|
||||||
self.assertEqual("edenfs:", three.device)
|
self.assertEqual(b"edenfs:", three.device)
|
||||||
self.assertEqual("/tmp/eden_test.4rec6drf/mounts/main", three.mount_point)
|
self.assertEqual(b"/tmp/eden_test.4rec6drf/mounts/main", three.mount_point)
|
||||||
self.assertEqual("fuse", three.vfstype)
|
self.assertEqual(b"fuse", three.vfstype)
|
||||||
|
|
||||||
def test_parse_mtab_macos(self):
|
def test_parse_mtab_macos(self) -> None:
|
||||||
contents = b"""\
|
contents = b"""\
|
||||||
/dev/disk1s1 on / (apfs, local, journaled)
|
/dev/disk1s1 on / (apfs, local, journaled)
|
||||||
devfs on /dev (devfs, local, nobrowse)
|
devfs on /dev (devfs, local, nobrowse)
|
||||||
|
@ -4,17 +4,20 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from .. import stats_print
|
from .. import stats_print
|
||||||
from ..stats import DiagInfoCounters, get_counter_table, get_store_latency
|
from ..stats import DiagInfoCounters, get_counter_table, get_store_latency
|
||||||
|
|
||||||
|
|
||||||
class StatsTest(unittest.TestCase):
|
class StatsTest(unittest.TestCase):
|
||||||
maxDiff = None
|
maxDiff: Optional[int] = None
|
||||||
|
|
||||||
def test_print_heading(self):
|
def test_print_heading(self) -> None:
|
||||||
expected_output = """\
|
expected_output = """\
|
||||||
**********
|
**********
|
||||||
TheHeading
|
TheHeading
|
||||||
@ -25,7 +28,7 @@ class StatsTest(unittest.TestCase):
|
|||||||
stats_print.write_heading("TheHeading", out)
|
stats_print.write_heading("TheHeading", out)
|
||||||
self.assertEqual(out.getvalue(), expected_output)
|
self.assertEqual(out.getvalue(), expected_output)
|
||||||
|
|
||||||
def test_print_latency_record(self):
|
def test_print_latency_record(self) -> None:
|
||||||
matrix = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]
|
matrix = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]
|
||||||
expected_output = """\
|
expected_output = """\
|
||||||
| avg 1 2 3 4
|
| avg 1 2 3 4
|
||||||
@ -39,7 +42,7 @@ access | p90 9 10 11 12
|
|||||||
stats_print.write_latency_record("access", matrix, out)
|
stats_print.write_latency_record("access", matrix, out)
|
||||||
self.assertEqual(out.getvalue(), expected_output)
|
self.assertEqual(out.getvalue(), expected_output)
|
||||||
|
|
||||||
def test_print_table(self):
|
def test_print_table(self) -> None:
|
||||||
table = {
|
table = {
|
||||||
"key1": [1, 2, 3, 4],
|
"key1": [1, 2, 3, 4],
|
||||||
"key2": [5, 6, 7, 8],
|
"key2": [5, 6, 7, 8],
|
||||||
@ -58,7 +61,7 @@ key4 13 14 15 16
|
|||||||
stats_print.write_table(table, "SystemCall", out)
|
stats_print.write_table(table, "SystemCall", out)
|
||||||
self.assertEqual(expected_output, out.getvalue())
|
self.assertEqual(expected_output, out.getvalue())
|
||||||
|
|
||||||
def test_print_table_with_shorter_header_and_key_column(self):
|
def test_print_table_with_shorter_header_and_key_column(self) -> None:
|
||||||
table = {"key": [1, 2, 3, 4]}
|
table = {"key": [1, 2, 3, 4]}
|
||||||
# Verifies the width of the first column depends on the header's and
|
# Verifies the width of the first column depends on the header's and
|
||||||
# key's lengths.
|
# key's lengths.
|
||||||
@ -71,7 +74,7 @@ key 1 2 3 4
|
|||||||
stats_print.write_table(table, "SC", out)
|
stats_print.write_table(table, "SC", out)
|
||||||
self.assertEqual(expected_output, out.getvalue())
|
self.assertEqual(expected_output, out.getvalue())
|
||||||
|
|
||||||
def test_format_size(self):
|
def test_format_size(self) -> None:
|
||||||
self.assertEqual("1.5 GB", stats_print.format_size(1500000000))
|
self.assertEqual("1.5 GB", stats_print.format_size(1500000000))
|
||||||
# rounds up
|
# rounds up
|
||||||
self.assertEqual("1.6 GB", stats_print.format_size(1590000000))
|
self.assertEqual("1.6 GB", stats_print.format_size(1590000000))
|
||||||
@ -79,7 +82,7 @@ key 1 2 3 4
|
|||||||
self.assertEqual("12 B", stats_print.format_size(12))
|
self.assertEqual("12 B", stats_print.format_size(12))
|
||||||
self.assertEqual("0", stats_print.format_size(0))
|
self.assertEqual("0", stats_print.format_size(0))
|
||||||
|
|
||||||
def test_time_formats_correctly(self):
|
def test_time_formats_correctly(self) -> None:
|
||||||
self.assertEqual(stats_print.format_time(0), "0 second(s)")
|
self.assertEqual(stats_print.format_time(0), "0 second(s)")
|
||||||
self.assertEqual(stats_print.format_time(30), "30 second(s)")
|
self.assertEqual(stats_print.format_time(30), "30 second(s)")
|
||||||
self.assertEqual(stats_print.format_time(60), "1.0 minute(s)")
|
self.assertEqual(stats_print.format_time(60), "1.0 minute(s)")
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from eden.fs.cli.tabulate import tabulate
|
from eden.fs.cli.tabulate import tabulate
|
||||||
@ -13,7 +15,7 @@ eol = ""
|
|||||||
|
|
||||||
|
|
||||||
class TabulateTest(unittest.TestCase):
|
class TabulateTest(unittest.TestCase):
|
||||||
def test_tabulate(self):
|
def test_tabulate(self) -> None:
|
||||||
output = tabulate(
|
output = tabulate(
|
||||||
["a", "b", "c"],
|
["a", "b", "c"],
|
||||||
rows=[
|
rows=[
|
||||||
@ -29,7 +31,7 @@ a_1 b_1 see_1{eol}
|
|||||||
a_two b_2 c_2{eol}""",
|
a_two b_2 c_2{eol}""",
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_tabulate_header_labels(self):
|
def test_tabulate_header_labels(self) -> None:
|
||||||
output = tabulate(
|
output = tabulate(
|
||||||
["a", "b", "c"],
|
["a", "b", "c"],
|
||||||
rows=[
|
rows=[
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import math
|
import math
|
||||||
import typing
|
import typing
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from ..top import Process, format_duration
|
from ..top import Process, format_duration
|
||||||
@ -14,22 +16,22 @@ class TopTest(unittest.TestCase):
|
|||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
self.process = Process(42, "ls", "fbsource")
|
self.process = Process(42, "ls", "fbsource")
|
||||||
|
|
||||||
def test_format_cmd(self):
|
def test_format_cmd(self) -> None:
|
||||||
self.assertEqual("ls", format_cmd(b"/bin/ls"))
|
self.assertEqual("ls", format_cmd(b"/bin/ls"))
|
||||||
self.assertEqual("'chg[worker/0]'", format_cmd(b"chg[worker/0]"))
|
self.assertEqual("'chg[worker/0]'", format_cmd(b"chg[worker/0]"))
|
||||||
|
|
||||||
def test_format_cmd_with_arg(self):
|
def test_format_cmd_with_arg(self) -> None:
|
||||||
self.assertEqual("ls -l", format_cmd(b"/bin/ls\x00-l"), "ls -l")
|
self.assertEqual("ls -l", format_cmd(b"/bin/ls\x00-l"), "ls -l")
|
||||||
self.assertEqual("ls -l 'one two'", format_cmd(b"ls\0-l\0one two"))
|
self.assertEqual("ls -l 'one two'", format_cmd(b"ls\0-l\0one two"))
|
||||||
|
|
||||||
def test_format_cmd_trailing_null(self):
|
def test_format_cmd_trailing_null(self) -> None:
|
||||||
self.assertEqual("ls -l", format_cmd(b"ls\x00-l\x00"), "ls -l")
|
self.assertEqual("ls -l", format_cmd(b"ls\x00-l\x00"), "ls -l")
|
||||||
self.assertEqual("ls -l ''", format_cmd(b"ls\x00-l\x00\x00"), "ls -l ''")
|
self.assertEqual("ls -l ''", format_cmd(b"ls\x00-l\x00\x00"), "ls -l ''")
|
||||||
|
|
||||||
def test_format_mount(self):
|
def test_format_mount(self) -> None:
|
||||||
self.assertEqual(format_mount("/data/users/zuck/fbsource"), "fbsource")
|
self.assertEqual(format_mount("/data/users/zuck/fbsource"), "fbsource")
|
||||||
|
|
||||||
def test_format_duration(self):
|
def test_format_duration(self) -> None:
|
||||||
self.assertEqual(format_duration(1), "1ns")
|
self.assertEqual(format_duration(1), "1ns")
|
||||||
self.assertEqual(format_duration(1 * 1000), "1us")
|
self.assertEqual(format_duration(1 * 1000), "1us")
|
||||||
self.assertEqual(format_duration(1 * 1000 * 1000), "1ms")
|
self.assertEqual(format_duration(1 * 1000 * 1000), "1ms")
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import stat
|
import stat
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
# This software may be used and distributed according to the terms of the
|
# This software may be used and distributed according to the terms of the
|
||||||
# GNU General Public License version 2.
|
# GNU General Public License version 2.
|
||||||
|
|
||||||
|
# pyre-strict
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import signal
|
import signal
|
||||||
import subprocess
|
import subprocess
|
||||||
@ -43,7 +45,7 @@ class WaitForShutdownTest(unittest.TestCase):
|
|||||||
class AutoReapingChildProcess:
|
class AutoReapingChildProcess:
|
||||||
"""A child process (subprocess.Popen) which is promptly reaped."""
|
"""A child process (subprocess.Popen) which is promptly reaped."""
|
||||||
|
|
||||||
def __init__(self, args) -> None:
|
def __init__(self, args: typing.List[str]) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self.__condition = threading.Condition()
|
self.__condition = threading.Condition()
|
||||||
@ -65,29 +67,26 @@ class AutoReapingChildProcess:
|
|||||||
with self.__condition:
|
with self.__condition:
|
||||||
while self.__returncode is None:
|
while self.__returncode is None:
|
||||||
self.__condition.wait()
|
self.__condition.wait()
|
||||||
assert self.__returncode is not None
|
returncode = self.__returncode
|
||||||
# pyre-fixme[7]: Expected `int` but got `Optional[int]`.
|
assert returncode is not None
|
||||||
return self.__returncode
|
return returncode
|
||||||
|
|
||||||
def __wait_for_process_start(self) -> None:
|
def __wait_for_process_start(self) -> None:
|
||||||
with self.__condition:
|
with self.__condition:
|
||||||
while self.__pid is None and self.__error is None:
|
while self.__pid is None and self.__error is None:
|
||||||
self.__condition.wait()
|
self.__condition.wait()
|
||||||
if self.__error is not None:
|
error = self.__error
|
||||||
# pyre-fixme[48]: Expression `self.__error` has type
|
if error is not None:
|
||||||
# `Optional[BaseException]` but must extend BaseException.
|
raise error
|
||||||
raise self.__error
|
|
||||||
assert self.__pid is not None
|
assert self.__pid is not None
|
||||||
|
|
||||||
def __start_thread(self, *args, **kwargs) -> None:
|
def __start_thread(self, *args: typing.List[str]) -> None:
|
||||||
thread = threading.Thread(
|
thread = threading.Thread(target=self.__run_thread, args=(args), daemon=True)
|
||||||
target=self.__run_thread, args=(args, kwargs), daemon=True
|
|
||||||
)
|
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
def __run_thread(self, popen_args, popen_kwargs) -> None:
|
def __run_thread(self, popen_args: typing.List[str]) -> None:
|
||||||
try:
|
try:
|
||||||
with subprocess.Popen(*popen_args, **popen_kwargs) as process:
|
with subprocess.Popen(popen_args) as process:
|
||||||
with self.__condition:
|
with self.__condition:
|
||||||
self.__pid = process.pid
|
self.__pid = process.pid
|
||||||
self.__condition.notify_all()
|
self.__condition.notify_all()
|
||||||
|
Loading…
Reference in New Issue
Block a user