Move fairseq.distributed_utils -> fairseq.distributed.utils (#1547)

Summary: Pull Request resolved: https://github.com/fairinternal/fairseq-py/pull/1547

Test Plan: Imported from OSS

Reviewed By: girifb

Differential Revision: D25836855

Pulled By: myleott

fbshipit-source-id: addd8a7fe8dac43252b100d7331e04e95f555781
This commit is contained in:
Myle Ott 2021-01-28 14:18:48 -08:00 committed by Facebook GitHub Bot
parent d68a3530dd
commit 27b96eb698
12 changed files with 200 additions and 167 deletions

View File

@ -9,7 +9,7 @@ from dataclasses import dataclass, field
from typing import List, Optional, Tuple
import torch
from fairseq import distributed_utils as dist_utils, utils
from fairseq import utils
from fairseq.data import (
Dictionary,
TokenBlockDataset,
@ -17,6 +17,7 @@ from fairseq.data import (
iterators,
)
from fairseq.dataclass import FairseqDataclass
from fairseq.distributed import utils as dist_utils
from fairseq.tasks import FairseqTask, register_task
from omegaconf import II

View File

@ -16,9 +16,11 @@ except ImportError:
__all__ = ["pdb"]
# backwards compatibility to support `from fairseq.meters import AverageMeter`
# backwards compatibility to support `from fairseq.X import Y`
from fairseq.distributed import utils as distributed_utils
from fairseq.logging import meters, metrics, progress_bar # noqa
sys.modules["fairseq.distributed_utils"] = distributed_utils
sys.modules["fairseq.meters"] = meters
sys.modules["fairseq.metrics"] = metrics
sys.modules["fairseq.progress_bar"] = progress_bar

View File

@ -14,8 +14,8 @@ from typing import List
import numpy as np
import torch
from fairseq import distributed_utils
from fairseq.data import FairseqDataset, data_utils
from fairseq.distributed import utils as distributed_utils
def get_time_gap(s, e):

View File

@ -20,7 +20,7 @@ from contextlib import contextmanager
import torch
from torch import nn
from fairseq import distributed_utils
from fairseq.distributed import utils
class LegacyDistributedDataParallel(nn.Module):
@ -43,7 +43,7 @@ class LegacyDistributedDataParallel(nn.Module):
self.module = module
self.process_group = process_group
self.world_size = distributed_utils.get_world_size(self.process_group)
self.world_size = utils.get_world_size(self.process_group)
# Never use a bigger buffer than the number of model params
self.buffer_size = min(buffer_size, sum(p.numel() for p in module.parameters()))
@ -107,7 +107,7 @@ class LegacyDistributedDataParallel(nn.Module):
if nonzero_buffer:
buffer.div_(self.world_size)
distributed_utils.all_reduce(buffer, self.process_group)
utils.all_reduce(buffer, self.process_group)
# copy all-reduced grads back into their original place
offset = 0

View File

@ -6,7 +6,7 @@
import torch
from torch import nn
from fairseq import distributed_utils
from fairseq.distributed import utils
class TPUDistributedDataParallel(nn.Module):
@ -15,7 +15,7 @@ class TPUDistributedDataParallel(nn.Module):
super().__init__()
self.module = module
self.process_group = process_group
self.world_size = distributed_utils.get_world_size(self.process_group)
self.world_size = utils.get_world_size(self.process_group)
def forward(self, *inputs, **kwargs):
return self.module(*inputs, **kwargs)

View File

@ -19,7 +19,6 @@ from typing import Any, Dict, List, Mapping, Optional
import torch
import torch.distributed as dist
from fairseq import utils
from fairseq.dataclass.configs import DistributedTrainingConfig, FairseqConfig
from omegaconf import open_dict
@ -49,164 +48,23 @@ def infer_init_method(cfg: DistributedTrainingConfig, force_distributed=False):
return
if cfg.pipeline_model_parallel:
balance_exists = (
cfg.pipeline_balance is not None
or cfg.pipeline_encoder_balance is not None
or cfg.pipeline_decoder_balance is not None
)
devices_exist = (
cfg.pipeline_devices is not None
or cfg.pipeline_encoder_devices is not None
or cfg.pipeline_decoder_devices is not None
)
if not balance_exists:
raise ValueError(
"--pipeline-balance is currently required for pipeline model parallelism"
)
if not devices_exist:
raise ValueError(
"--pipeline-devices is currently required for pipeline model parallelism"
)
num_pipeline_devices, num_pipelines_per_node = _pipeline_parallel_pre_init(cfg)
cfg.pipeline_balance = utils.eval_str_list(cfg.pipeline_balance, type=int)
if cfg.pipeline_devices is not None:
cfg.pipeline_devices = utils.eval_str_list(cfg.pipeline_devices, type=int)
num_pipeline_devices = len(set(cfg.pipeline_devices))
else:
cfg.pipeline_encoder_devices = utils.eval_str_list(
cfg.pipeline_encoder_devices, type=int
)
cfg.pipeline_decoder_devices = utils.eval_str_list(
cfg.pipeline_decoder_devices, type=int
)
num_pipeline_devices = len(
set(cfg.pipeline_encoder_devices + cfg.pipeline_decoder_devices)
)
gpus_per_node = torch.cuda.device_count()
assert (
gpus_per_node >= num_pipeline_devices
and gpus_per_node % num_pipeline_devices == 0
), (
"the number of unique device IDs in --pipeline-devices must evenly divide "
"the number of GPUs per node (multi-node pipelining is not yet supported)"
)
num_pipelines_per_node = gpus_per_node // num_pipeline_devices
# support torch.distributed.launch
if all(
key in os.environ
for key in ["MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "RANK"]
):
cfg.distributed_init_method = "env://"
cfg.distributed_world_size = int(os.environ["WORLD_SIZE"])
cfg.distributed_rank = int(os.environ["RANK"])
# processes are created by torch.distributed.launch
cfg.distributed_no_spawn = True
# we can determine the init method automatically for Slurm
# support torch.distributed.launch
_infer_torch_distributed_launch_init(cfg)
elif cfg.distributed_port > 0:
node_list = os.environ.get("SLURM_STEP_NODELIST")
if node_list is None:
node_list = os.environ.get("SLURM_JOB_NODELIST")
if node_list is not None:
try:
hostnames = subprocess.check_output(
["scontrol", "show", "hostnames", node_list]
)
cfg.distributed_init_method = "tcp://{host}:{port}".format(
host=hostnames.split()[0].decode("utf-8"),
port=cfg.distributed_port,
)
nnodes = int(os.environ.get("SLURM_NNODES"))
ntasks_per_node = os.environ.get("SLURM_NTASKS_PER_NODE")
if ntasks_per_node is not None:
ntasks_per_node = int(ntasks_per_node)
else:
ntasks = int(os.environ.get("SLURM_NTASKS"))
nnodes = int(os.environ.get("SLURM_NNODES"))
assert ntasks % nnodes == 0
ntasks_per_node = int(ntasks / nnodes)
if ntasks_per_node == 1:
gpus_per_node = torch.cuda.device_count()
node_id = int(os.environ.get("SLURM_NODEID"))
cfg.distributed_rank = node_id * gpus_per_node
cfg.distributed_world_size = nnodes * gpus_per_node
elif cfg.pipeline_model_parallel:
assert ntasks_per_node == num_pipelines_per_node, (
"SLURM --ntasks-per-node must match number of pipelines per "
"node (={})".format(num_pipelines_per_node)
)
cfg.distributed_no_spawn = True
# For 4-way MP on nodes with 8 GPUs, ranks will be [0, 1] on
# the first node, [1, 2] on the second node, etc. This
# matches torch.distributed.launch.
node_id = int(os.environ.get("SLURM_NODEID"))
local_id = int(os.environ.get("SLURM_LOCALID"))
cfg.distributed_rank = node_id * num_pipelines_per_node + local_id
# In the above example, device_id will always be in [0, 1],
# which also matches torch.distributed.launch.
cfg.device_id = local_id
# We also want to set distributed_world_size to be the total
# number of pipelines across all nodes.
cfg.distributed_world_size = nnodes * num_pipelines_per_node
else:
assert ntasks_per_node == cfg.distributed_world_size // nnodes
cfg.distributed_no_spawn = True
cfg.distributed_rank = int(os.environ.get("SLURM_PROCID"))
cfg.device_id = int(os.environ.get("SLURM_LOCALID"))
except subprocess.CalledProcessError as e: # scontrol failed
raise e
except FileNotFoundError: # Slurm is not installed
pass
# we can determine the init method automatically for Slurm
_infer_slurm_init(cfg, num_pipelines_per_node)
elif cfg.distributed_world_size > 1 or force_distributed:
# fallback for single node with multiple GPUs
assert (
cfg.distributed_world_size <= torch.cuda.device_count()
), f"world size is {cfg.distributed_world_size} but have {torch.cuda.device_count()} available devices"
port = random.randint(10000, 20000)
cfg.distributed_init_method = "tcp://localhost:{port}".format(port=port)
_infer_single_node_init(cfg)
if cfg.pipeline_model_parallel:
if not cfg.distributed_no_spawn:
# When distributed_no_spawn is False, we expect distributed_rank and
# distributed_world_size to be based on the total number of GPUs, so
# we need to correct them to be based on the number of pipelines.
assert cfg.distributed_world_size % num_pipeline_devices == 0
cfg.distributed_world_size = (
cfg.distributed_world_size // num_pipeline_devices
)
# In the case of 4-way MP on nodes with 8 GPUs, we want
# distributed_rank to be the starting GPU index for each pipeline
# i.e., 0, 2, ...
assert cfg.distributed_rank % gpus_per_node == 0
assert cfg.distributed_rank % num_pipeline_devices == 0
with open_dict(cfg):
cfg.distributed_rank = cfg.distributed_rank // num_pipeline_devices
# launch one process per pipeline
cfg.distributed_num_procs = num_pipelines_per_node
# if we have 4-way MP on a node with 8 GPUs, we want device_ids to be 0
# and 4, indicating the starting device IDs for each pipeline
cfg.device_id *= num_pipeline_devices
if cfg.device_id > 0:
# if there's multiple pipelines on a node (e.g., 4-way MP on an 8
# GPU node), we need to adjust pipeline_devices accordingly
logger.debug(
"setting CUDA device={} on rank {}".format(
cfg.device_id, cfg.distributed_rank
)
)
torch.cuda.set_device(cfg.device_id)
with open_dict(cfg):
cfg.pipeline_devices = [cfg.device_id + d for d in cfg.pipeline_devices]
logger.info(
"setting pipeline_devices={} on rank {}".format(
cfg.pipeline_devices, cfg.distributed_rank
)
)
_pipeline_parallel_post_init(cfg, num_pipeline_devices, num_pipelines_per_node)
elif not cfg.distributed_no_spawn:
with open_dict(cfg):
cfg.distributed_num_procs = min(
@ -214,6 +72,171 @@ def infer_init_method(cfg: DistributedTrainingConfig, force_distributed=False):
)
def _infer_torch_distributed_launch_init(cfg: DistributedTrainingConfig):
cfg.distributed_init_method = "env://"
cfg.distributed_world_size = int(os.environ["WORLD_SIZE"])
cfg.distributed_rank = int(os.environ["RANK"])
# processes are created by torch.distributed.launch
cfg.distributed_no_spawn = True
def _infer_slurm_init(cfg: DistributedTrainingConfig, num_pipelines_per_node):
node_list = os.environ.get("SLURM_STEP_NODELIST")
if node_list is None:
node_list = os.environ.get("SLURM_JOB_NODELIST")
if node_list is not None:
try:
hostnames = subprocess.check_output(
["scontrol", "show", "hostnames", node_list]
)
cfg.distributed_init_method = "tcp://{host}:{port}".format(
host=hostnames.split()[0].decode("utf-8"),
port=cfg.distributed_port,
)
nnodes = int(os.environ.get("SLURM_NNODES"))
ntasks_per_node = os.environ.get("SLURM_NTASKS_PER_NODE")
if ntasks_per_node is not None:
ntasks_per_node = int(ntasks_per_node)
else:
ntasks = int(os.environ.get("SLURM_NTASKS"))
nnodes = int(os.environ.get("SLURM_NNODES"))
assert ntasks % nnodes == 0
ntasks_per_node = int(ntasks / nnodes)
if ntasks_per_node == 1:
gpus_per_node = torch.cuda.device_count()
node_id = int(os.environ.get("SLURM_NODEID"))
cfg.distributed_rank = node_id * gpus_per_node
cfg.distributed_world_size = nnodes * gpus_per_node
elif cfg.pipeline_model_parallel:
assert ntasks_per_node == num_pipelines_per_node, (
"SLURM --ntasks-per-node must match number of pipelines per "
"node (={})".format(num_pipelines_per_node)
)
cfg.distributed_no_spawn = True
# For 4-way MP on nodes with 8 GPUs, ranks will be [0, 1] on
# the first node, [1, 2] on the second node, etc. This
# matches torch.distributed.launch.
node_id = int(os.environ.get("SLURM_NODEID"))
local_id = int(os.environ.get("SLURM_LOCALID"))
cfg.distributed_rank = node_id * num_pipelines_per_node + local_id
# In the above example, device_id will always be in [0, 1],
# which also matches torch.distributed.launch.
cfg.device_id = local_id
# We also want to set distributed_world_size to be the total
# number of pipelines across all nodes.
cfg.distributed_world_size = nnodes * num_pipelines_per_node
else:
assert ntasks_per_node == cfg.distributed_world_size // nnodes
cfg.distributed_no_spawn = True
cfg.distributed_rank = int(os.environ.get("SLURM_PROCID"))
cfg.device_id = int(os.environ.get("SLURM_LOCALID"))
except subprocess.CalledProcessError as e: # scontrol failed
raise e
except FileNotFoundError: # Slurm is not installed
pass
def _infer_single_node_init(cfg: DistributedTrainingConfig):
assert (
cfg.distributed_world_size <= torch.cuda.device_count()
), f"world size is {cfg.distributed_world_size} but have {torch.cuda.device_count()} available devices"
port = random.randint(10000, 20000)
cfg.distributed_init_method = "tcp://localhost:{port}".format(port=port)
def _pipeline_parallel_pre_init(cfg: DistributedTrainingConfig):
from fairseq import utils
balance_exists = (
cfg.pipeline_balance is not None
or cfg.pipeline_encoder_balance is not None
or cfg.pipeline_decoder_balance is not None
)
devices_exist = (
cfg.pipeline_devices is not None
or cfg.pipeline_encoder_devices is not None
or cfg.pipeline_decoder_devices is not None
)
if not balance_exists:
raise ValueError(
"--pipeline-balance is currently required for pipeline model parallelism"
)
if not devices_exist:
raise ValueError(
"--pipeline-devices is currently required for pipeline model parallelism"
)
cfg.pipeline_balance = utils.eval_str_list(cfg.pipeline_balance, type=int)
if cfg.pipeline_devices is not None:
cfg.pipeline_devices = utils.eval_str_list(cfg.pipeline_devices, type=int)
num_pipeline_devices = len(set(cfg.pipeline_devices))
else:
cfg.pipeline_encoder_devices = utils.eval_str_list(
cfg.pipeline_encoder_devices, type=int
)
cfg.pipeline_decoder_devices = utils.eval_str_list(
cfg.pipeline_decoder_devices, type=int
)
num_pipeline_devices = len(
set(cfg.pipeline_encoder_devices + cfg.pipeline_decoder_devices)
)
gpus_per_node = torch.cuda.device_count()
assert (
gpus_per_node >= num_pipeline_devices
and gpus_per_node % num_pipeline_devices == 0
), (
"the number of unique device IDs in --pipeline-devices must evenly divide "
"the number of GPUs per node (multi-node pipelining is not yet supported)"
)
num_pipelines_per_node = gpus_per_node // num_pipeline_devices
return num_pipeline_devices, num_pipelines_per_node
def _pipeline_parallel_post_init(
cfg: DistributedTrainingConfig, num_pipeline_devices, num_pipelines_per_node
):
if not cfg.distributed_no_spawn:
# When distributed_no_spawn is False, we expect distributed_rank and
# distributed_world_size to be based on the total number of GPUs, so
# we need to correct them to be based on the number of pipelines.
assert cfg.distributed_world_size % num_pipeline_devices == 0
cfg.distributed_world_size = (
cfg.distributed_world_size // num_pipeline_devices
)
# In the case of 4-way MP on nodes with 8 GPUs, we want
# distributed_rank to be the starting GPU index for each pipeline
# i.e., 0, 2, ...
gpus_per_node = torch.cuda.device_count()
assert cfg.distributed_rank % gpus_per_node == 0
assert cfg.distributed_rank % num_pipeline_devices == 0
with open_dict(cfg):
cfg.distributed_rank = cfg.distributed_rank // num_pipeline_devices
# launch one process per pipeline
cfg.distributed_num_procs = num_pipelines_per_node
# if we have 4-way MP on a node with 8 GPUs, we want device_ids to be 0
# and 4, indicating the starting device IDs for each pipeline
cfg.device_id *= num_pipeline_devices
if cfg.device_id > 0:
# if there's multiple pipelines on a node (e.g., 4-way MP on an 8
# GPU node), we need to adjust pipeline_devices accordingly
logger.debug(
"setting CUDA device={} on rank {}".format(
cfg.device_id, cfg.distributed_rank
)
)
torch.cuda.set_device(cfg.device_id)
with open_dict(cfg):
cfg.pipeline_devices = [cfg.device_id + d for d in cfg.pipeline_devices]
logger.info(
"setting pipeline_devices={} on rank {}".format(
cfg.pipeline_devices, cfg.distributed_rank
)
)
def distributed_init(cfg: FairseqConfig):
if isinstance(cfg, Namespace):
from fairseq.dataclass.utils import convert_namespace_to_omegaconf
@ -537,6 +560,8 @@ def all_gather_list(data, group=None, max_size=16384):
max_size (int, optional): maximum size of the data to be gathered
across workers
"""
from fairseq import utils
if group is None:
group = get_global_group()
rank = get_rank(group=group)

View File

@ -7,9 +7,9 @@
Train a network across multiple GPUs.
"""
from fairseq import distributed_utils
from fairseq.trainer import Trainer
from fairseq.dataclass.configs import FairseqConfig
from fairseq.distributed import utils as distributed_utils
from fairseq.trainer import Trainer
try:
from fairseq.model_parallel.megatron.mpu import (

View File

@ -12,7 +12,6 @@ import torch
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel
from fairseq import distributed_utils
from fairseq.distributed import (
DistributedTimeoutWrapper,
LegacyDistributedDataParallel,

View File

@ -16,9 +16,10 @@ from itertools import chain
from typing import Any, Dict, List
import torch
from fairseq import checkpoint_utils, distributed_utils, models, optim, utils
from fairseq import checkpoint_utils, models, optim, utils
from fairseq.dataclass.configs import FairseqConfig
from fairseq.dataclass.utils import convert_namespace_to_omegaconf
from fairseq.distributed import utils as distributed_utils
from fairseq.file_io import PathManager
from fairseq.logging import meters, metrics
from fairseq.nan_detector import NanDetector

View File

@ -17,10 +17,6 @@ from typing import Callable, Dict, List, Optional
import torch
import torch.nn.functional as F
from fairseq.data import iterators
from fairseq.file_io import PathManager
from fairseq.logging.meters import safe_round
from fairseq.modules import gelu, gelu_accurate
from fairseq.modules.multihead_attention import MultiheadAttention
from torch import Tensor
@ -51,6 +47,8 @@ class FileContentsAction(argparse.Action):
super(FileContentsAction, self).__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
from fairseq.file_io import PathManager
if PathManager.isfile(values):
with PathManager.open(values) as f:
argument = f.read().strip()
@ -482,6 +480,8 @@ def log_softmax(x, dim: int, onnx_trace: bool = False):
def get_perplexity(loss, round=2, base=2):
from fairseq.logging.meters import safe_round
if loss is None:
return 0.0
try:
@ -497,6 +497,8 @@ def deprecation_warning(message, stacklevel=3):
def get_activation_fn(activation: str) -> Callable:
""" Returns the activation function corresponding to `activation` """
from fairseq.modules import gelu, gelu_accurate
if activation == "relu":
return F.relu
elif activation == "gelu":
@ -665,6 +667,7 @@ def get_tpu_device():
def tpu_data_loader(itr):
import torch_xla.core.xla_model as xm
import torch_xla.distributed.parallel_loader as pl
from fairseq.data import iterators
xm.rendezvous("tpu_data_loader") # wait for all workers
xm.mark_step()

View File

@ -11,9 +11,11 @@ from multiprocessing import Manager
import torch
import torch.nn as nn
from fairseq import distributed_utils, optim
from fairseq import optim
from fairseq.distributed import utils as distributed_utils
from omegaconf import OmegaConf
class Model(nn.Module):
def __init__(self, input_size, output_size):
super(Model, self).__init__()

View File

@ -9,7 +9,7 @@ import unittest
import torch
from fairseq import distributed_utils as dist_utils
from fairseq.distributed import utils as dist_utils
from .utils import objects_are_equal, spawn_and_init