This is an automated email from the ASF dual-hosted git repository.
ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 5b4e3480 [LIVY-1024] Upgrade Livy to Python3
5b4e3480 is described below
commit 5b4e3480c51298d02242eb427717d97d91eae011
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Dec 4 20:34:38 2025 +0530
[LIVY-1024] Upgrade Livy to Python3
## What changes were proposed in this pull request?
* Python2 was deprecated a while back, most systems/environments don't
support it anymore
* Bump Livy to Python3, remove Python2 support since Spark 2 has also now
been deprecated
* Closes LIVY-1024
## How was this patch tested?
* CI / Locally tested the python3 files manually
---
README.md | 6 +--
dev/docker/livy-dev-base/Dockerfile | 28 ++--------
dev/merge_livy_pr.py | 58 ++++++--------------
examples/src/main/python/pi_app.py | 4 +-
integration-test/src/test/resources/batch.py | 2 +-
.../src/test/resources/test_python_api.py | 44 +++++----------
.../test/scala/org/apache/livy/test/JobApiIT.scala | 2 +-
python-api/setup.py | 6 +--
python-api/src/main/python/livy/client.py | 11 ++--
python-api/src/main/python/livy/job_context.py | 5 +-
python-api/src/main/python/livy/job_handle.py | 7 +--
.../src/test/python/livy-tests/client_test.py | 23 ++++----
repl/src/main/resources/fake_shell.py | 63 +++-------------------
.../org/apache/livy/repl/PythonInterpreter.scala | 2 +-
14 files changed, 71 insertions(+), 190 deletions(-)
diff --git a/README.md b/README.md
index 5fbe4f46..150243f6 100644
--- a/README.md
+++ b/README.md
@@ -29,20 +29,20 @@ To build Livy, you will need:
Debian/Ubuntu:
* mvn (from ``maven`` package or maven3 tarball)
* openjdk-8-jdk (or Oracle JDK 8)
- * Python 2.7+
+ * Python 3.x+
* R 3.x
Redhat/CentOS:
* mvn (from ``maven`` package or maven3 tarball)
* java-1.8.0-openjdk (or Oracle JDK 8)
- * Python 2.7+
+ * Python 3.x+
* R 3.x
MacOS:
* Xcode command line tools
* Oracle's JDK 1.8
* Maven (Homebrew)
- * Python 2.7+
+ * Python 3.x+
* R 3.x
Required python packages for building Livy:
diff --git a/dev/docker/livy-dev-base/Dockerfile
b/dev/docker/livy-dev-base/Dockerfile
index 07711a5f..5686e61d 100644
--- a/dev/docker/livy-dev-base/Dockerfile
+++ b/dev/docker/livy-dev-base/Dockerfile
@@ -70,32 +70,11 @@ RUN git clone https://github.com/pyenv/pyenv.git $HOME/pyenv
ENV PYENV_ROOT=$HOME/pyenv
ENV PATH="$HOME/pyenv/shims:$HOME/pyenv/bin:$HOME/bin:$PATH"
-RUN pyenv install -v 2.7.18 && \
- pyenv install -v 3.9.21 && \
- pyenv global 2.7.18 3.9.21 && \
+RUN pyenv install -v 3.9.21 && \
+ pyenv global 3.9.21 && \
pyenv rehash
-# Add build dependencies for python2
-# - First we upgrade pip because that makes a lot of things better
-# - Then we remove the provided version of setuptools and install a different
version
-# - Then we install additional dependencies
-RUN python2 -m pip install -U "pip < 21.0" && \
- apt-get remove -y python-setuptools && \
- python2 -m pip install "setuptools < 36" && \
- python2 -m pip install \
- cloudpickle \
- codecov \
- flake8 \
- flaky \
- "future>=0.15.2" \
- "futures>=3.0.5" \
- pytest \
- pytest-runner \
- requests-kerberos \
- "requests >= 2.10.0" \
- "responses >= 0.5.1"
-
-# Now do the same for python3
+# Install build dependencies for python3
RUN python3 -m pip install -U pip && pip3 install \
cloudpickle \
codecov \
@@ -112,4 +91,3 @@ RUN pyenv rehash
RUN apt remove -y openjdk-11-jre-headless
WORKDIR /workspace
-
diff --git a/dev/merge_livy_pr.py b/dev/merge_livy_pr.py
index 85ce7417..335607ff 100755
--- a/dev/merge_livy_pr.py
+++ b/dev/merge_livy_pr.py
@@ -33,21 +33,15 @@
# usage: ./merge_livy_pr.py (see config env vars below)
#
-
import json
import os
import re
import subprocess
import sys
+import urllib.request
+from urllib.error import HTTPError
-if sys.version_info[0] < 3:
- import urllib2
- from urllib2 import HTTPError
- input_prompt_fn = raw_input
-else:
- import urllib.request as urllib2
- from urllib.error import HTTPError
- input_prompt_fn = input
+input_prompt_fn = input
try:
import jira.client
@@ -71,7 +65,6 @@ JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "")
# https://github.com/settings/tokens. This script only requires the
"public_repo" scope.
GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY")
-
GITHUB_BASE = "https://github.com/apache/incubator-livy/pull"
GITHUB_API_BASE = "https://api.github.com/repos/apache/incubator-livy"
JIRA_BASE = "https://issues.apache.org/jira/browse"
@@ -79,13 +72,12 @@ JIRA_API_BASE = "https://issues.apache.org/jira"
# Prefix added to temporary branches
BRANCH_PREFIX = "PR_TOOL"
-
def get_json(url):
try:
- request = urllib2.Request(url)
+ request = urllib.request.Request(url)
if GITHUB_OAUTH_KEY:
request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY)
- return json.load(urllib2.urlopen(request))
+ return json.load(urllib.request.urlopen(request))
except HTTPError as e:
if "X-RateLimit-Remaining" in e.headers and
e.headers["X-RateLimit-Remaining"] == '0':
print("Exceeded the GitHub API rate limit; see the instructions in
" +
@@ -95,42 +87,34 @@ def get_json(url):
print("Unable to fetch URL, exiting: %s" % url)
sys.exit(-1)
-
def fail(msg):
print(msg)
clean_up()
sys.exit(-1)
-
def run_cmd(cmd):
print(cmd)
if isinstance(cmd, list):
out_bytes = subprocess.check_output(cmd)
else:
out_bytes = subprocess.check_output(cmd.split(" "))
- if sys.version_info[0] > 2:
- return out_bytes.decode()
- else:
- return out_bytes
-
+ return out_bytes.decode()
def continue_maybe(prompt):
result = input_prompt_fn("\n%s (y/n): " % prompt)
if result.lower() != "y":
fail("Okay, exiting")
-
def clean_up():
print("Restoring head pointer to %s" % original_head)
run_cmd("git checkout %s" % original_head)
branches = run_cmd("git branch").replace(" ", "").split("\n")
- for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches):
+ for branch in [x for x in branches if x.startswith(BRANCH_PREFIX)]:
print("Deleting local branch %s" % branch)
run_cmd("git branch -D %s" % branch)
-
# merge the requested PR and return the merge hash
def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num)
@@ -201,7 +185,6 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
print("Merge hash: %s" % merge_hash)
return merge_hash
-
def cherry_pick(pr_num, merge_hash, default_branch):
pick_ref = input_prompt_fn("Enter a branch name [%s]: " % default_branch)
if pick_ref == "":
@@ -236,15 +219,13 @@ def cherry_pick(pr_num, merge_hash, default_branch):
print("Pick hash: %s" % pick_hash)
return pick_ref
-
def fix_version_from_branch(branch, versions):
# Note: Assumes this is a sorted (newest->oldest) list of un-released
versions
if branch == "master":
return versions[0]
else:
branch_ver = branch.replace("branch-", "")
- return filter(lambda x: x.name.startswith(branch_ver), versions)[-1]
-
+ return [x for x in versions if x.name.startswith(branch_ver)][-1]
def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
@@ -275,11 +256,11 @@ def resolve_jira_issue(merge_branches, comment,
default_jira_id=""):
versions = asf_jira.project_versions("LIVY")
versions = sorted(versions, key=lambda x: x.name, reverse=True)
- versions = filter(lambda x: x.raw['released'] is False, versions)
+ versions = [x for x in versions if x.raw['released'] is False]
# Consider only x.y.z versions
- versions = filter(lambda x: re.match('\d+\.\d+\.\d+', x.name), versions)
+ versions = [x for x in versions if re.match(r'\d+\.\d+\.\d+', x.name)]
- default_fix_versions = map(lambda x: fix_version_from_branch(x,
versions).name, merge_branches)
+ default_fix_versions = [fix_version_from_branch(x, versions).name for x in
merge_branches]
for v in default_fix_versions:
# Handles the case where we have forked a release branch but not yet
made the release.
# In this case, if the PR is committed to the master branch and the
release branch, we
@@ -289,7 +270,7 @@ def resolve_jira_issue(merge_branches, comment,
default_jira_id=""):
if patch == "0":
previous = "%s.%s.%s" % (major, int(minor) - 1, 0)
if previous in default_fix_versions:
- default_fix_versions = filter(lambda x: x != v,
default_fix_versions)
+ default_fix_versions = [x for x in default_fix_versions if x
!= v]
default_fix_versions = ",".join(default_fix_versions)
fix_versions = input_prompt_fn(
@@ -299,19 +280,18 @@ def resolve_jira_issue(merge_branches, comment,
default_jira_id=""):
fix_versions = fix_versions.replace(" ", "").split(",")
def get_version_json(version_str):
- return filter(lambda v: v.name == version_str, versions)[0].raw
+ return [v for v in versions if v.name == version_str][0].raw
- jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
+ jira_fix_versions = [get_version_json(v) for v in fix_versions]
- resolve = filter(lambda a: a['name'] == "Resolve Issue",
asf_jira.transitions(jira_id))[0]
- resolution = filter(lambda r: r.raw['name'] == "Fixed",
asf_jira.resolutions())[0]
+ resolve = [a for a in asf_jira.transitions(jira_id) if a['name'] ==
"Resolve Issue"][0]
+ resolution = [r for r in asf_jira.resolutions() if r.raw['name'] ==
"Fixed"][0]
asf_jira.transition_issue(
jira_id, resolve["id"], fixVersions=jira_fix_versions,
comment=comment, resolution={'id': resolution.raw['id']})
print("Successfully resolved %s with fixVersions=%s!" % (jira_id,
fix_versions))
-
def resolve_jira_issues(title, merge_branches, comment):
jira_ids = re.findall("LIVY-[0-9]{3,6}", title)
@@ -320,7 +300,6 @@ def resolve_jira_issues(title, merge_branches, comment):
for jira_id in jira_ids:
resolve_jira_issue(merge_branches, comment, jira_id)
-
def standardize_jira_ref(text):
"""
Standardize the [LIVY-XXXXX] [MODULE] prefix
@@ -362,7 +341,6 @@ def standardize_jira_ref(text):
return clean_text
-
def get_current_ref():
ref = run_cmd("git rev-parse --abbrev-ref HEAD").strip()
if ref == 'HEAD':
@@ -371,7 +349,6 @@ def get_current_ref():
else:
return ref
-
def main():
global original_head
@@ -379,7 +356,7 @@ def main():
original_head = get_current_ref()
branches = get_json("%s/branches" % GITHUB_API_BASE)
- branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x
in branches])
+ branch_names = [x for x in [x['name'] for x in branches] if
x.startswith("branch-")]
# Assumes branch names can be sorted lexicographically
latest_branch = sorted(branch_names, reverse=True)[0]
@@ -462,7 +439,6 @@ def main():
print("Could not find jira-python library. Run 'sudo pip install jira'
to install.")
print("Exiting without trying to close the associated JIRA.")
-
if __name__ == "__main__":
import doctest
(failure_count, test_count) = doctest.testmod()
diff --git a/examples/src/main/python/pi_app.py
b/examples/src/main/python/pi_app.py
index 2945d4b3..c283476c 100644
--- a/examples/src/main/python/pi_app.py
+++ b/examples/src/main/python/pi_app.py
@@ -15,8 +15,6 @@
# limitations under the License.
#
-from __future__ import print_function
-
import sys
from random import random
from operator import add
@@ -48,7 +46,7 @@ if __name__ == "__main__":
return 1 if x ** 2 + y ** 2 <= 1 else 0
def pi_job(context):
- count = context.sc.parallelize(range(1, samples + 1),
slices).map(f).reduce(add)
+ count = context.sc.parallelize(list(range(1, samples + 1)),
slices).map(f).reduce(add)
return 4.0 * count / samples
pi = client.submit(pi_job).result()
diff --git a/integration-test/src/test/resources/batch.py
b/integration-test/src/test/resources/batch.py
index 56a53cef..e371652a 100644
--- a/integration-test/src/test/resources/batch.py
+++ b/integration-test/src/test/resources/batch.py
@@ -22,6 +22,6 @@ from pyspark import SparkContext
output = sys.argv[1]
sc = SparkContext(appName="PySpark Test")
try:
- sc.parallelize(range(100), 10).map(lambda x: (x, x *
2)).saveAsTextFile(output)
+ sc.parallelize(list(range(100)), 10).map(lambda x: (x, x *
2)).saveAsTextFile(output)
finally:
sc.stop()
diff --git a/integration-test/src/test/resources/test_python_api.py
b/integration-test/src/test/resources/test_python_api.py
index f89f85d8..bef67bd0 100644
--- a/integration-test/src/test/resources/test_python_api.py
+++ b/integration-test/src/test/resources/test_python_api.py
@@ -14,22 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import os
import base64
import json
import time
-try:
- from urllib.parse import urlparse
-except ImportError:
- from urlparse import urlparse
+from urllib.parse import urlparse
import requests
from requests_kerberos import HTTPKerberosAuth, REQUIRED, OPTIONAL
import cloudpickle
import pytest
-try:
- import httplib
-except ImportError:
- from http import HTTPStatus as httplib
+import http.client
from flaky import flaky
global session_id, job_id
@@ -58,7 +53,6 @@ upload_pyfile_url = os.environ.get("UPLOAD_PYFILE_URL")
def after_all(request):
request.addfinalizer(stop_session)
-
def process_job(job, expected_result, is_error_job=False):
global job_id
@@ -69,7 +63,7 @@ def process_job(job, expected_result, is_error_job=False):
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header,
data=base64_pickled_job_json, auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.CREATED
+ assert response.status_code == http.client.CREATED
job_id = response.json()['id']
poll_time = 1
@@ -85,7 +79,7 @@ def process_job(job, expected_result, is_error_job=False):
poll_time *= 2
assert poll_response.json()['id'] == job_id
- assert poll_response.status_code == httplib.OK
+ assert poll_response.status_code == http.client.OK
if not is_error_job:
assert poll_response.json()['error'] is None
result = poll_response.json()['result']
@@ -97,20 +91,17 @@ def process_job(job, expected_result, is_error_job=False):
error = poll_response.json()['error']
assert expected_result in error
-
def delay_rerun(*args):
time.sleep(10)
return True
-
def stop_session():
global session_id
request_url = livy_end_point + "/sessions/" + str(session_id)
headers = {'X-Requested-By': 'livy'}
response = requests.request('DELETE', request_url, headers=headers,
auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.OK
-
+ assert response.status_code == http.client.OK
def test_create_session():
global session_id
@@ -121,21 +112,19 @@ def test_create_session():
json_data = json.dumps({'kind': 'pyspark', 'conf': {'livy.uri':
uri.geturl()}})
response = requests.request('POST', request_url, headers=header,
data=json_data, auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.CREATED
+ assert response.status_code == http.client.CREATED
session_id = response.json()['id']
-
@flaky(max_runs=6, rerun_filter=delay_rerun)
def test_wait_for_session_to_become_idle():
request_url = livy_end_point + "/sessions/" + str(session_id)
header = {'X-Requested-By': 'livy'}
response = requests.request('GET', request_url, headers=header,
auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.OK
+ assert response.status_code == http.client.OK
session_state = response.json()['state']
assert session_state == 'idle'
-
def test_spark_job():
def simple_spark_job(context):
elements = [10, 20, 30]
@@ -144,7 +133,6 @@ def test_spark_job():
process_job(simple_spark_job, 3)
-
def test_error_job():
def error_job(context):
return "hello" + 1
@@ -152,7 +140,6 @@ def test_error_job():
process_job(error_job,
"TypeError: ", True)
-
def test_reconnect():
global session_id
@@ -160,10 +147,9 @@ def test_reconnect():
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header,
auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.OK
+ assert response.status_code == http.client.OK
assert session_id == response.json()['id']
-
def test_add_file():
add_file_name = os.path.basename(add_file_url)
json_data = json.dumps({'uri': add_file_url})
@@ -171,7 +157,7 @@ def test_add_file():
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header,
data=json_data, auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.OK
+ assert response.status_code == http.client.OK
def add_file_job(context):
from pyspark import SparkFiles
@@ -181,7 +167,6 @@ def test_add_file():
process_job(add_file_job, "hello from addfile")
-
def test_add_pyfile():
add_pyfile_name_with_ext = os.path.basename(add_pyfile_url)
add_pyfile_name = add_pyfile_name_with_ext.rsplit('.', 1)[0]
@@ -190,7 +175,7 @@ def test_add_pyfile():
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response_add_pyfile = requests.request('POST', request_url,
headers=header, data=json_data, auth=request_auth, verify=ssl_cert)
- assert response_add_pyfile.status_code == httplib.OK
+ assert response_add_pyfile.status_code == http.client.OK
def add_pyfile_job(context):
pyfile_module = __import__ (add_pyfile_name)
@@ -198,7 +183,6 @@ def test_add_pyfile():
process_job(add_pyfile_job, "hello from addpyfile")
-
def test_upload_file():
upload_file = open(upload_file_url)
upload_file_name = os.path.basename(upload_file.name)
@@ -207,7 +191,7 @@ def test_upload_file():
header = {'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header,
files=files, auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.OK
+ assert response.status_code == http.client.OK
def upload_file_job(context):
from pyspark import SparkFiles
@@ -217,7 +201,6 @@ def test_upload_file():
process_job(upload_file_job, "hello from uploadfile")
-
def test_upload_pyfile():
upload_pyfile = open(upload_pyfile_url)
upload_pyfile_name_with_ext = os.path.basename(upload_pyfile.name)
@@ -226,14 +209,13 @@ def test_upload_pyfile():
files = {'file': upload_pyfile}
header = {'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header,
files=files, auth=request_auth, verify=ssl_cert)
- assert response.status_code == httplib.OK
+ assert response.status_code == http.client.OK
def upload_pyfile_job(context):
pyfile_module = __import__ (upload_pyfile_name)
return pyfile_module.test_upload_pyfile()
process_job(upload_pyfile_job, "hello from uploadpyfile")
-
if __name__ == '__main__':
value = pytest.main([os.path.dirname(__file__)])
if value != 0:
diff --git
a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
index 92c3ea24..d62906ae 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
@@ -255,7 +255,7 @@ class JobApiIT extends BaseIntegrationTestSuite with
BeforeAndAfterAll with Logg
val testDir = Files.createTempDirectory(tmpDir.toPath(),
"python-tests-").toFile()
val testFile = createPyTestsForPythonAPI(testDir)
- val builder = new ProcessBuilder(Seq("python",
testFile.getAbsolutePath()).asJava)
+ val builder = new ProcessBuilder(Seq("python3",
testFile.getAbsolutePath()).asJava)
builder.directory(testDir)
val env = builder.environment()
diff --git a/python-api/setup.py b/python-api/setup.py
index 59bec11f..511592f2 100644
--- a/python-api/setup.py
+++ b/python-api/setup.py
@@ -23,15 +23,13 @@ CLASSIFIERS = [
'Development Status :: 1 - Planning',
'Intended Audience :: Developers',
'Operating System :: OS Independent',
- 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.9',
'Topic :: Software Development :: Libraries :: Python Modules',
]
requirements = [
'cloudpickle>=0.2.1',
- 'configparser>=3.5.0',
- 'future>=0.15.2',
- 'mock~=3.0.5',
'requests>=2.10.0',
'responses>=0.5.1',
'requests-kerberos>=0.11.0',
diff --git a/python-api/src/main/python/livy/client.py
b/python-api/src/main/python/livy/client.py
index d9830f32..22b88132 100644
--- a/python-api/src/main/python/livy/client.py
+++ b/python-api/src/main/python/livy/client.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from __future__ import absolute_import
import base64
import cloudpickle
@@ -25,13 +24,13 @@ import threading
import traceback
from configparser import ConfigParser
from concurrent.futures import ThreadPoolExecutor
-from future.moves.urllib.parse import ParseResult, urlparse
-from io import open, StringIO
+from urllib.parse import ParseResult, urlparse
+from io import StringIO
from requests_kerberos import HTTPKerberosAuth, REQUIRED
from livy.job_handle import JobHandle
-class HttpClient(object):
+class HttpClient:
"""A http based client for submitting Spark-based jobs to a Livy backend.
Parameters
@@ -357,7 +356,7 @@ class HttpClient(object):
self._config.remove_option(self._CONFIG_SECTION, key)
def _set_multiple_conf(self, conf_dict):
- for key, value in conf_dict.items():
+ for key, value in list(conf_dict.items()):
self._set_conf(key, value)
def _load_config(self, load_defaults, conf_dict):
@@ -426,7 +425,7 @@ class HttpClient(object):
data=data, headers=headers).content
-class _LivyConnection(object):
+class _LivyConnection:
_SESSIONS_URI = '/sessions'
# Timeout in seconds
diff --git a/python-api/src/main/python/livy/job_context.py
b/python-api/src/main/python/livy/job_context.py
index 83236499..a36dbc2a 100644
--- a/python-api/src/main/python/livy/job_context.py
+++ b/python-api/src/main/python/livy/job_context.py
@@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
from abc import ABCMeta, abstractproperty, abstractmethod
-class JobContext:
+class JobContext(metaclass=ABCMeta):
"""
An abstract class that holds runtime information about the job execution
context.
@@ -28,8 +29,6 @@ class JobContext:
"""
- __metaclass__ = ABCMeta
-
@abstractproperty
def sc(self):
"""
diff --git a/python-api/src/main/python/livy/job_handle.py
b/python-api/src/main/python/livy/job_handle.py
index 278834c2..fc4dcde2 100644
--- a/python-api/src/main/python/livy/job_handle.py
+++ b/python-api/src/main/python/livy/job_handle.py
@@ -14,9 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import base64
import cloudpickle
-import sys
import threading
import traceback
from concurrent.futures import Future
@@ -217,10 +217,7 @@ class JobHandle(Future):
raise NotImplementedError("This operation is not supported.")
def set_job_exception(self, exception, error_msg=None):
- if sys.version >= '3':
- super(JobHandle, self).set_exception(exception)
- else:
- super(JobHandle, self).set_exception_info(exception, error_msg)
+ super(JobHandle, self).set_exception(exception)
class _RepeatedTimer(object):
def __init__(self, interval, polling_job, executor):
diff --git a/python-api/src/test/python/livy-tests/client_test.py
b/python-api/src/test/python/livy-tests/client_test.py
index b6426ae1..efa3d446 100644
--- a/python-api/src/test/python/livy-tests/client_test.py
+++ b/python-api/src/test/python/livy-tests/client_test.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import os
import pytest
import responses
@@ -27,7 +28,7 @@ from livy.client import HttpClient
session_id = 0
job_id = 1
# Make sure host name is lower case. See LIVY-582
-base_uri = 'http://{0}:{1}'.format(socket.gethostname().lower(), 8998)
+base_uri = f'http://{socket.gethostname().lower()}:{8998}'
client_test = None
invoked_queued_callback = False
invoked_running_callback = False
@@ -40,8 +41,8 @@ def mock_and_validate_create_new_session(defaults):
app_name = 'Test App'
conf_dict = {'spark.app.name': app_name}
json_data = {
- u'kind': u'pyspark', u'log': [], u'proxyUser': None,
- u'state': u'starting', u'owner': None, u'id': session_id
+ 'kind': 'pyspark', 'log': [], 'proxyUser': None,
+ 'state': 'starting', 'owner': None, 'id': session_id
}
responses.add(responses.POST, create_session_request_mock_uri,
json=json_data, status=201, content_type='application/json')
@@ -68,13 +69,13 @@ def mock_submit_job_and_poll_result(
+ "/jobs/" + str(job_id)
post_json_data = {
- u'state': u'SENT', u'error': None, u'id': job_id, u'result': None
+ 'state': 'SENT', 'error': None, 'id': job_id, 'result': None
}
responses.add(responses.POST, submit_request_mock_uri, status=201,
json=post_json_data, content_type='application/json')
get_json_data = {
- u'state': job_state, u'error': error, u'id': job_id, u'result': result
+ 'state': job_state, 'error': error, 'id': job_id, 'result': result
}
responses.add(responses.GET, poll_request_mock_uri, status=200,
json=get_json_data, content_type='application/json')
@@ -117,8 +118,8 @@ def test_connect_to_existing_session():
"/connect"
reconnect_session_uri = base_uri + "/sessions/" + str(session_id)
json_data = {
- u'kind': u'pyspark', u'log': [], u'proxyUser': None,
- u'state': u'starting', u'owner': None, u'id': session_id
+ 'kind': 'pyspark', 'log': [], 'proxyUser': None,
+ 'state': 'starting', 'owner': None, 'id': session_id
}
with responses.RequestsMock() as rsps:
rsps.add(responses.POST, reconnect_mock_request_uri, json=json_data,
@@ -143,7 +144,7 @@ def create_test_archive(ext):
@responses.activate
def test_submit_job_verify_running_state():
submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
- u'STARTED')
+ 'STARTED')
lock = threading.Event()
def handle_job_running_callback(f):
@@ -158,7 +159,7 @@ def test_submit_job_verify_running_state():
@responses.activate
def test_submit_job_verify_queued_state():
submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
- u'QUEUED')
+ 'QUEUED')
lock = threading.Event()
def handle_job_queued_callback(f):
@@ -173,7 +174,7 @@ def test_submit_job_verify_queued_state():
@responses.activate
def test_submit_job_verify_succeeded_state():
submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
- u'SUCCEEDED',
+ 'SUCCEEDED',
result='Z0FKVkZGc3hNREFzSURJd01Dd2dNekF3TENBME1EQmRjUUF1')
result = submit_job_future.result(15)
assert result == '[100, 200, 300, 400]'
@@ -181,7 +182,7 @@ def test_submit_job_verify_succeeded_state():
@responses.activate
def test_submit_job_verify_failed_state():
- submit_job_future = mock_submit_job_and_poll_result(failure_job, u'FAILED',
+ submit_job_future = mock_submit_job_and_poll_result(failure_job, 'FAILED',
error='Error job')
exception = submit_job_future.exception(15)
assert isinstance(exception, Exception)
diff --git a/repl/src/main/resources/fake_shell.py
b/repl/src/main/resources/fake_shell.py
index 5472f533..2c67a542 100644
--- a/repl/src/main/resources/fake_shell.py
+++ b/repl/src/main/resources/fake_shell.py
@@ -15,7 +15,6 @@
# limitations under the License.
#
-from __future__ import print_function
import ast
from collections import OrderedDict
import datetime
@@ -34,12 +33,6 @@ import shutil
import pickle
import textwrap
-if sys.version >= '3':
- unicode = str
-else:
- import cStringIO
- import StringIO
-
if sys.version_info > (3,8):
from ast import Module
else :
@@ -66,31 +59,25 @@ def execute_reply(status, content):
)
}
-
def execute_reply_ok(data):
return execute_reply('ok', {
'data': data,
})
-
def execute_reply_error(exc_type, exc_value, tb):
LOG.error('execute_reply', exc_info=True)
- if sys.version >= '3':
- formatted_tb = traceback.format_exception(exc_type, exc_value, tb,
chain=False)
- else:
- formatted_tb = traceback.format_exception(exc_type, exc_value, tb)
+ formatted_tb = traceback.format_exception(exc_type, exc_value, tb,
chain=False)
for i in range(len(formatted_tb)):
if TOP_FRAME_REGEX.match(formatted_tb[i]):
formatted_tb = formatted_tb[:1] + formatted_tb[i + 1:]
break
return execute_reply('error', {
- 'ename': unicode(exc_type.__name__),
- 'evalue': unicode(exc_value),
+ 'ename': str(exc_type.__name__),
+ 'evalue': str(exc_value),
'traceback': formatted_tb,
})
-
def execute_reply_internal_error(message, exc_info=None):
LOG.error('execute_reply_internal_error', exc_info=exc_info)
return execute_reply('error', {
@@ -99,7 +86,6 @@ def execute_reply_internal_error(message, exc_info=None):
'traceback': [],
})
-
class JobContextImpl(object):
def __init__(self):
self.lock = threading.Lock()
@@ -185,14 +171,10 @@ class JobContextImpl(object):
except:
pass
-
class PySparkJobProcessorImpl(object):
def processBypassJob(self, serialized_job):
try:
- if sys.version >= '3':
- deserialized_job = pickle.loads(serialized_job,
encoding="bytes")
- else:
- deserialized_job = pickle.loads(serialized_job)
+ deserialized_job = pickle.loads(serialized_job, encoding="bytes")
result = deserialized_job(job_context)
serialized_result = global_dict['cloudpickle'].dumps(result)
response = bytearray(base64.b64encode(serialized_result))
@@ -209,15 +191,13 @@ class PySparkJobProcessorImpl(object):
def getLocalTmpDirPath(self):
return os.path.join(job_context.get_local_tmp_dir_path(), '__livy__')
- class Scala:
+ class Scala(object):
extends = ['org.apache.livy.repl.PySparkJobProcessor']
-
class ExecutionError(Exception):
def __init__(self, exc_info):
self.exc_info = exc_info
-
class NormalNode(object):
def __init__(self, code):
self.code = compile(code, '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
@@ -240,11 +220,9 @@ class NormalNode(object):
# code and passing the error along.
raise ExecutionError(sys.exc_info())
-
class UnknownMagic(Exception):
pass
-
class MagicNode(object):
def __init__(self, line):
parts = line[1:].split(' ', 1)
@@ -264,7 +242,6 @@ class MagicNode(object):
return handler(*self.rest)
-
def parse_code_into_nodes(code):
nodes = []
try:
@@ -304,7 +281,6 @@ def parse_code_into_nodes(code):
return nodes
-
def execute_request(content):
try:
code = content['code']
@@ -354,7 +330,6 @@ def execute_request(content):
return execute_reply_ok(result)
-
def magic_table_convert(value):
try:
converter = magic_table_types[type(value)]
@@ -363,7 +338,6 @@ def magic_table_convert(value):
return converter(value)
-
def magic_table_convert_seq(items):
last_item_type = None
converted_items = []
@@ -380,7 +354,6 @@ def magic_table_convert_seq(items):
return 'ARRAY_TYPE', converted_items
-
def magic_table_convert_map(m):
last_key_type = None
last_value_type = None
@@ -404,7 +377,6 @@ def magic_table_convert_map(m):
return 'MAP_TYPE', converted_items
-
magic_table_types = {
type(None): lambda x: ('NULL_TYPE', x),
bool: lambda x: ('BOOLEAN_TYPE', x),
@@ -419,15 +391,6 @@ magic_table_types = {
dict: magic_table_convert_map,
}
-# python 2.x only
-if sys.version < '3':
- magic_table_types.update({
- long: lambda x: ('BIGINT_TYPE', x),
- unicode: lambda x: ('STRING_TYPE', x.encode('utf-8'))
- })
-
-
-
def magic_table(name):
try:
value = global_dict[name]
@@ -444,7 +407,7 @@ def magic_table(name):
for row in value:
cols = []
data.append(cols)
-
+
if 'Row' == row.__class__.__name__:
row = row.asDict()
@@ -488,7 +451,6 @@ def magic_table(name):
}
}
-
def magic_json(name):
try:
value = global_dict[name]
@@ -507,9 +469,7 @@ def magic_matplot(name):
imgdata = io.BytesIO()
fig.savefig(imgdata, format='png')
imgdata.seek(0)
- encode = base64.b64encode(imgdata.getvalue())
- if sys.version >= '3':
- encode = encode.decode()
+ encode = base64.b64encode(imgdata.getvalue()).decode()
except:
exc_type, exc_value, tb = sys.exc_info()
@@ -523,7 +483,6 @@ def magic_matplot(name):
def shutdown_request(_content):
sys.exit()
-
magic_router = {
'table': magic_table,
'json': magic_json,
@@ -541,24 +500,18 @@ class UnicodeDecodingStringIO(io.StringIO):
s = s.decode("utf-8")
super(UnicodeDecodingStringIO, self).write(s)
-
def clearOutputs():
sys.stdout.close()
sys.stderr.close()
sys.stdout = UnicodeDecodingStringIO()
sys.stderr = UnicodeDecodingStringIO()
-
def main():
sys_stdin = sys.stdin
sys_stdout = sys.stdout
sys_stderr = sys.stderr
- if sys.version >= '3':
- sys.stdin = io.StringIO()
- else:
- sys.stdin = cStringIO.StringIO()
-
+ sys.stdin = io.StringIO()
sys.stdout = UnicodeDecodingStringIO()
sys.stderr = UnicodeDecodingStringIO()
diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index 58b7147a..40a25c55 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -49,7 +49,7 @@ object PythonInterpreter extends Logging {
val pythonExec = conf.getOption("spark.pyspark.python")
.orElse(sys.env.get("PYSPARK_PYTHON"))
.orElse(sys.props.get("pyspark.python")) // This java property is only
used for internal UT.
- .getOrElse("python")
+ .getOrElse("python3")
val secretKey = Utils.createSecret(256)
val gatewayServer = createGatewayServer(sparkEntries, secretKey)