This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e0b09a17e13 [SPARK-41777][PYSPARK][ML] Integration testing for
TorchDistributor
e0b09a17e13 is described below
commit e0b09a17e136617002e2b8f3b813299659652340
Author: Rithwik Ediga Lakhamsani <[email protected]>
AuthorDate: Sat Jan 21 16:22:35 2023 +0900
[SPARK-41777][PYSPARK][ML] Integration testing for TorchDistributor
Just view the latest commit in this PR for the most accurate diff.
### What changes were proposed in this pull request?
Added integration tests for running distributed training on files.
### Why are the changes needed?
N/A
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A
Closes #39637 from rithwik-db/integration-testing.
Authored-by: Rithwik Ediga Lakhamsani <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
dev/infra/Dockerfile | 3 +
dev/requirements.txt | 4 +
python/pyspark/ml/torch/tests/test_distributor.py | 14 +++
.../pyspark/ml/torch/torch_run_process_wrapper.py | 2 +
python/test_support/test_pytorch_training_file.py | 115 +++++++++++++++++++++
5 files changed, 138 insertions(+)
diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile
index f226058f186..cce4c0cf830 100644
--- a/dev/infra/Dockerfile
+++ b/dev/infra/Dockerfile
@@ -69,3 +69,6 @@ RUN python3.9 -m pip install numpy pyarrow 'pandas<=1.5.3'
scipy unittest-xml-re
# Add Python deps for Spark Connect.
RUN python3.9 -m pip install grpcio protobuf googleapis-common-protos
grpcio-status
+
+# Add torch as a testing dependency for TorchDistributor
+RUN python3.9 -m pip install torch torchvision
diff --git a/dev/requirements.txt b/dev/requirements.txt
index 1d978c4602c..77a508621fb 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -59,3 +59,7 @@ googleapis-common-protos==1.56.4
mypy-protobuf==3.3.0
googleapis-common-protos-stubs==2.2.0
grpc-stubs==1.24.11
+
+# TorchDistributor dependencies
+torch==1.13.1
+torchvision==0.14.1
diff --git a/python/pyspark/ml/torch/tests/test_distributor.py
b/python/pyspark/ml/torch/tests/test_distributor.py
index 619e733c0bb..747229cb9fd 100644
--- a/python/pyspark/ml/torch/tests/test_distributor.py
+++ b/python/pyspark/ml/torch/tests/test_distributor.py
@@ -289,6 +289,13 @@ class TorchDistributorLocalUnitTests(unittest.TestCase):
if cuda_env_var:
self.delete_env_vars({CUDA_VISIBLE_DEVICES: cuda_env_var})
+ def test_local_file_with_pytorch(self) -> None:
+ test_file_path = "python/test_support/test_pytorch_training_file.py"
+ learning_rate_str = "0.01"
+ TorchDistributor(num_processes=2, local_mode=True, use_gpu=False).run(
+ test_file_path, learning_rate_str
+ )
+
class TorchDistributorDistributedUnitTests(unittest.TestCase):
def setUp(self) -> None:
@@ -350,6 +357,13 @@ class
TorchDistributorDistributedUnitTests(unittest.TestCase):
self.spark.sparkContext._conf.set("spark.task.resource.gpu.amount",
"1")
+ def test_distributed_file_with_pytorch(self) -> None:
+ test_file_path = "python/test_support/test_pytorch_training_file.py"
+ learning_rate_str = "0.01"
+ TorchDistributor(num_processes=2, local_mode=False, use_gpu=False).run(
+ test_file_path, learning_rate_str
+ )
+
class TorchWrapperUnitTests(unittest.TestCase):
def test_clean_and_terminate(self) -> None:
diff --git a/python/pyspark/ml/torch/torch_run_process_wrapper.py
b/python/pyspark/ml/torch/torch_run_process_wrapper.py
index 6b5b6a1d0be..7439d09d0c0 100644
--- a/python/pyspark/ml/torch/torch_run_process_wrapper.py
+++ b/python/pyspark/ml/torch/torch_run_process_wrapper.py
@@ -52,6 +52,8 @@ if __name__ == "__main__":
cmd = [sys.executable, "-m", "torch.distributed.run", *args]
task = subprocess.Popen(
cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
env=os.environ,
)
diff --git a/python/test_support/test_pytorch_training_file.py
b/python/test_support/test_pytorch_training_file.py
new file mode 100644
index 00000000000..4107197acfd
--- /dev/null
+++ b/python/test_support/test_pytorch_training_file.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# type: ignore
+
+batch_size = 100
+num_epochs = 3
+momentum = 0.5
+log_interval = 100
+
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+import torch.optim as optim
+from torchvision import datasets, transforms
+import tempfile
+import shutil
+
+
+class Net(nn.Module):
+ def __init__(self):
+ super(Net, self).__init__()
+ self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
+ self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
+ self.conv2_drop = nn.Dropout2d()
+ self.fc1 = nn.Linear(320, 50)
+ self.fc2 = nn.Linear(50, 10)
+
+ def forward(self, x):
+ x = F.relu(F.max_pool2d(self.conv1(x), 2))
+ x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
+ x = x.view(-1, 320)
+ x = F.relu(self.fc1(x))
+ x = F.dropout(x, training=self.training)
+ x = self.fc2(x)
+ return F.log_softmax(x)
+
+
+def train_one_epoch(model, data_loader, optimizer, epoch):
+ model.train()
+ for batch_idx, (data, target) in enumerate(data_loader):
+ optimizer.zero_grad()
+ output = model(data)
+ loss = F.nll_loss(output, target)
+ loss.backward()
+ optimizer.step()
+ if batch_idx % log_interval == 0:
+ print(
+ "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
+ epoch,
+ batch_idx * len(data),
+ len(data_loader) * len(data),
+ 100.0 * batch_idx / len(data_loader),
+ loss.item(),
+ )
+ )
+
+
+def train(learning_rate):
+ import torch.distributed as dist
+ from torch.nn.parallel import DistributedDataParallel as DDP
+ from torch.utils.data.distributed import DistributedSampler
+
+ print("Running distributed training")
+ dist.init_process_group("gloo")
+
+ temp_dir = tempfile.mkdtemp()
+
+ train_dataset = datasets.MNIST(
+ temp_dir,
+ train=True,
+ download=True,
+ transform=transforms.Compose(
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
+ ),
+ )
+
+ train_sampler = DistributedSampler(dataset=train_dataset)
+ data_loader = torch.utils.data.DataLoader(
+ train_dataset, batch_size=batch_size, sampler=train_sampler
+ )
+
+ model = Net()
+ ddp_model = DDP(model)
+
+ optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate,
momentum=momentum)
+ for epoch in range(1, num_epochs + 1):
+ train_one_epoch(ddp_model, data_loader, optimizer, epoch)
+
+ dist.destroy_process_group()
+
+ shutil.rmtree(temp_dir)
+
+
+if __name__ == "__main__":
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("lr", help="learning_rate", default=0.001)
+ args = parser.parse_args()
+ print("learning rate chosen: ", float(args.lr))
+ train(float(args.lr))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]