fairseq/tests/test_file_chunker_utils.py
Pierre Andrews 53bf2b1293 Extract File Chunking to its own utils (#1955)
Summary:
## What does this PR do?

there are a few places where we do file chunking for multiprocessing a single file. However, the code is partly in Binarizer and partly just duplicated here and there.

This PR extracts the file chunking/reading logic. The multiprocessing logic could probably be extracted too, but I haven't found a good abstraction yet.

# Testing

Added testing for this reading logic + maybe fixed a bug where the last part of a file might get dropped (even if it's unclear with the current stopping logic)

Tested by running the preprocessing script as follow:
```
python -m fairseq_cli.preprocess --source-lang de --target-lang en --trainpref ...train.spm.clean.de_en --srcdict ...fairseq.dict --tgtdict .../fairseq.dict --destdir ... --workers 60
```

Pull Request resolved: https://github.com/fairinternal/fairseq-py/pull/1955

Reviewed By: myleott

Differential Revision: D29065473

Pulled By: Mortimerp9

fbshipit-source-id: c60843de8cfd45a63b3dbb8290f57ef3df3bf983
2021-06-28 01:46:32 -07:00

64 lines
2.2 KiB
Python

# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import shutil
import tempfile
import unittest
from typing import Optional
class TestFileChunker(unittest.TestCase):
_tmpdir: Optional[str] = None
_tmpfile: Optional[str] = None
_line_content = "Hello, World\n"
_num_bytes = None
_num_lines = 200
_num_splits = 20
@classmethod
def setUpClass(cls) -> None:
cls._num_bytes = len(cls._line_content.encode("utf-8"))
cls._tmpdir = tempfile.mkdtemp()
with open(os.path.join(cls._tmpdir, "test.txt"), "w") as f:
cls._tmpfile = f.name
for _i in range(cls._num_lines):
f.write(cls._line_content)
f.flush()
@classmethod
def tearDownClass(cls) -> None:
# Cleanup temp working dir.
if cls._tmpdir is not None:
shutil.rmtree(cls._tmpdir) # type: ignore
def test_find_offsets(self):
from fairseq.file_chunker_utils import find_offsets
offsets = find_offsets(self._tmpfile, self._num_splits)
self.assertEqual(len(offsets), self._num_splits + 1)
(zero, *real_offsets, last) = offsets
self.assertEqual(zero, 0)
for i, o in enumerate(real_offsets):
self.assertEqual(
o,
self._num_bytes
+ ((i + 1) * self._num_bytes * self._num_lines / self._num_splits),
)
self.assertEqual(last, self._num_bytes * self._num_lines)
def test_readchunks(self):
from fairseq.file_chunker_utils import Chunker, find_offsets
offsets = find_offsets(self._tmpfile, self._num_splits)
for start, end in zip(offsets, offsets[1:]):
with Chunker(self._tmpfile, start, end) as lines:
all_lines = list(lines)
num_lines = self._num_lines / self._num_splits
self.assertAlmostEqual(
len(all_lines), num_lines, delta=1
) # because we split on the bites, we might end up with one more/less line in a chunk
self.assertListEqual(
all_lines, [self._line_content for _ in range(len(all_lines))]
)