fairseq/tests/test_plasma_utils.py
Sam Shleifer 2235f86b40 PlasmaView: don't materialize array in memory (#1645)
Summary:
### Changes:
- `PlasmaArray` saves the underlying data to `self.array`, `PlasmaView` never does that, instead it fetches the data from `plasma_store` shared memory when it is needed.
- `PlasmaArray` starts a new, ephemeral plasma_store and puts a new array in it when it is pickled. If `--use-plasma-view`, there is one server started before `spawn` and arrays are only put into it once, in `PlasmaArray.__init__` to accommodate this.
- user can now pass `--plasma-path` to explicitly control where server is started.
- We now make plasma keys based on `(split_path, (block_size, document_sep_len, str(break_mode), len(dataset)))`, so two jobs sharing plasma server but with different datasets, or same dataset but different clargs, will read each the other's array.

### Results [pre March 1]
This saves some CPU memory (5-15%), according to both `psutil` and `psrecord`:
here we run base_cmd (below) with num_workers=0,2,8, 2 GPUS and collect the logs. `branch` refers to `--use-plasma-view`, `master` uses `PlasmaArray`

```
+-------------------------+----------------+---------+-------+
| setting                 |   cpu_mem_used |     wps |   ppl |
+=========================+================+=========+=======+
| branch_nw0_gpu2_ddm.log |          12    | 55143.2 | 429.1 |
+-------------------------+----------------+---------+-------+
| branch_nw2_gpu2_ddm.log |          13.67 | 43377.6 | 429.1 |
+-------------------------+----------------+---------+-------+
| branch_nw8_gpu2_ddm.log |          18.36 | 53019.9 | 429.1 |
+-------------------------+----------------+---------+-------+
| master_nw0_gpu2_ddm.log |          12.26 | 56733   | 429.1 |
+-------------------------+----------------+---------+-------+
| master_nw2_gpu2_ddm.log |          14.58 | 53337.9 | 429.1 |
+-------------------------+----------------+---------+-------+
| master_nw8_gpu2_ddm.log |          21.1  | 53217.2 | 429.1 |
+-------------------------+----------------+---------+-------+
```

### Replication

1) get this branch
```bash
git fetch && git checkout share-plasma-server
```

2) Train tiny model and save logs

```bash

base_cmd () {
  fairseq-train --fp16 /private/home/sshleifer/data-bin/stories_mmap \
            --task language_modeling \
            --arch transformer_lm_gpt2_tiny \
            --sample-break-mode complete --tokens-per-sample 512 \
            --optimizer adam --clip-norm 0.0 --lr 0.0005 \
            --batch-size 1 \
            --max-update 200 --max-epoch 1 \
            --log-format simple --log-interval 100 \
            --restore-file x.pt --no-save \
            --skip-invalid-size-inputs-valid-test --disable-validation $@
}

USE_LOCK=1 CUDA_VISIBLE_DEVICES=0,1 base_cmd --num-workers 0 --use-plasma-view | tee branch_nw0_gpu2_ddm.log
```

### TODO:

- [x] test larger dataset
- [x] make it optional, cleanup
- [x] 1 GPU
- [x] unit-tests
- [x] ask hashing Q on stackoverflow https://stackoverflow.com/questions/66354598/deterministic-method-to-hash-np-array-int
- [ ] measure whether `PlasmaArray` disable for small array's logic helps
- [ x] test with fb_sweep
- [ x] measure 4 GPU savings

Pull Request resolved: https://github.com/fairinternal/fairseq-py/pull/1645

Test Plan: Read github PR description: https://github.com/fairinternal/fairseq-py/pull/1645

Reviewed By: myleott

Differential Revision: D26630365

Pulled By: sshleifer

fbshipit-source-id: b0c4163fbc97a7aefb116de70265fba11f6d7b42
2021-03-12 12:31:12 -08:00

128 lines
4.7 KiB
Python

import contextlib
import unittest
import tempfile
from io import StringIO
import numpy as np
from tests.test_binaries import train_language_model
from tests.utils import create_dummy_data, preprocess_lm_data
try:
from pyarrow import plasma
from fairseq.data.plasma_utils import PlasmaView, PlasmaStore
PYARROW_AVAILABLE = True
except ImportError:
PYARROW_AVAILABLE = False
dummy_path = 'dummy'
@unittest.skipUnless(PYARROW_AVAILABLE, "")
class TestPlasmaView(unittest.TestCase):
def setUp(self) -> None:
self.tmp_file = tempfile.NamedTemporaryFile() # noqa: P201
self.path = self.tmp_file.name
self.server = PlasmaStore.start(path=self.path)
self.client = plasma.connect(self.path, num_retries=10)
def tearDown(self) -> None:
self.client.disconnect()
self.tmp_file.close()
self.server.kill()
def test_two_servers_do_not_share_object_id_space(self):
data_server_1 = np.array([0, 1])
data_server_2 = np.array([2, 3])
server_2_path = self.path
with tempfile.NamedTemporaryFile() as server_1_path:
server = PlasmaStore.start(path=server_1_path.name, nbytes=10000)
arr1 = PlasmaView(
data_server_1, dummy_path, 1, plasma_path=server_1_path.name
)
assert len(arr1.client.list()) == 1
assert (arr1.array == data_server_1).all()
arr2 = PlasmaView(data_server_2, dummy_path, 1, plasma_path=server_2_path)
assert (arr2.array == data_server_2).all()
assert (arr1.array == data_server_1).all()
server.kill()
def test_hash_collision(self):
data_server_1 = np.array([0, 1])
data_server_2 = np.array([2, 3])
arr1 = PlasmaView(data_server_1, dummy_path, 1, plasma_path=self.path)
assert len(arr1.client.list()) == 1
arr2 = PlasmaView(data_server_2, dummy_path, 1, plasma_path=self.path)
assert len(arr1.client.list()) == 1
assert len(arr2.client.list()) == 1
assert (arr2.array == data_server_1).all()
# New hash key based on tuples
arr3 = PlasmaView(
data_server_2, dummy_path, (1, 12312312312, None), plasma_path=self.path
)
assert (
len(arr2.client.list()) == 2
), "No new object was created by using a novel hash key"
assert (
arr3.object_id in arr2.client.list()
), "No new object was created by using a novel hash key"
assert (
arr3.object_id in arr3.client.list()
), "No new object was created by using a novel hash key"
del arr3, arr2, arr1
@staticmethod
def _assert_view_equal(pv1, pv2):
np.testing.assert_array_equal(pv1.array, pv2.array)
def test_putting_same_array_twice(self):
data = np.array([4, 4, 4])
arr1 = PlasmaView(data, dummy_path, 1, plasma_path=self.path)
assert len(self.client.list()) == 1
arr1b = PlasmaView(
data, dummy_path, 1, plasma_path=self.path
) # should not change contents of store
arr1c = PlasmaView(
None, dummy_path, 1, plasma_path=self.path
) # should not change contents of store
assert len(self.client.list()) == 1
self._assert_view_equal(arr1, arr1b)
self._assert_view_equal(arr1, arr1c)
PlasmaView(
data, dummy_path, 2, plasma_path=self.path
) # new object id, adds new entry
assert len(self.client.list()) == 2
new_client = plasma.connect(self.path)
assert len(new_client.list()) == 2 # new client can access same objects
assert isinstance(arr1.object_id, plasma.ObjectID)
del arr1b
del arr1c
def test_plasma_store_full_raises(self):
with tempfile.NamedTemporaryFile() as new_path:
server = PlasmaStore.start(path=new_path.name, nbytes=10000)
with self.assertRaises(plasma.PlasmaStoreFull):
# 2000 floats is more than 2000 bytes
PlasmaView(
np.random.rand(10000, 1), dummy_path, 1, plasma_path=new_path.name
)
server.kill()
def test_object_id_overflow(self):
PlasmaView.get_object_id("", 2 ** 21)
def test_training_lm_plasma(self):
with contextlib.redirect_stdout(StringIO()):
with tempfile.TemporaryDirectory("test_transformer_lm") as data_dir:
create_dummy_data(data_dir)
preprocess_lm_data(data_dir)
train_language_model(
data_dir,
"transformer_lm",
["--use-plasma-view", "--plasma-path", self.path],
run_validation=True,
)