Hi, being hit by this too, I’ve sumbitted an MR at [1].
You’ll also find in attachment the same fix on top of the version of dput currently in the archive which is slightly different from the one in the salsa repo. You can put it in /usr/share/dput/dput/dput.py as a short-term workaround. [1] https://salsa.debian.org/debian/dput/-/merge_requests/13 Happy hacking, -- Aurélien
#! /usr/bin/python3 # # dput/dput.py # Part of ???dput???, a Debian package upload toolkit. # # This is free software, and you are welcome to redistribute it under # certain conditions; see the end of this file for copyright # information, grant of license, and disclaimer of warranty. """ dput ??? Debian package upload tool. """ import configparser import email.parser from hashlib import ( md5, sha1, ) import importlib import os import os.path import pkgutil import re import signal import stat import subprocess import sys import textwrap import types from . import configfile from . import crypto from .helper import dputhelper app_library_path = os.path.dirname(__file__) debug = 0 def import_upload_functions(): """ Import upload method modules and make them available. """ upload_methods = {} package_name = "methods" modules_path = os.path.join(app_library_path, package_name) modules_found = [ name for (__, name, ispkg) in pkgutil.iter_modules([modules_path]) if not ispkg] if debug: sys.stdout.write("D: modules_found: {!r}\n".format(modules_found)) for module_name in modules_found: module = importlib.import_module("{package}.{module}".format( package=".".join(["dput", package_name]), module=module_name)) if debug: sys.stdout.write("D: Module: {module_name} ({module!r})\n".format( **vars())) method_name = module_name if debug: sys.stdout.write("D: Method name: {}\n".format(method_name)) upload_methods[method_name] = module.upload return upload_methods def parse_changes(changes_file): """ Parse the changes file. """ check = changes_file.read(5) if check != "-----": changes_file.seek(0) else: # found a PGP header, gonna ditch the next 3 lines # eat the rest of the line changes_file.readline() # Hash: SHA1 changes_file.readline() # empty line changes_file.readline() if not changes_file.readline().find("Format") != -1: changes_file.readline() changes_text = changes_file.read() changes = email.parser.HeaderParser().parsestr(changes_text) if 'files' not in changes: raise KeyError("No Files field in upload control file") for file_spec in changes['files'].strip().split("\n"): if len(file_spec.split()) != 5: sys.stderr.write(textwrap.dedent("""\ Invalid Files line in .changes: {} """).format(file_spec)) sys.exit(1) return changes def read_configs(config_files, debug): """ Read configuration settings from config files. :param config_files: Sequence of configuration files, open for reading. :param debug: If true, enable debugging output. :return: The resulting `ConfigParser` instance. The config parser will parse each file in `config_files`. Configuration from later files overrides earlier files. """ config = configparser.ConfigParser() config.set('DEFAULT', 'login', "username") config.set('DEFAULT', 'method', "scp") config.set('DEFAULT', 'hash', "md5") config.set('DEFAULT', 'allow_unsigned_uploads', "0") config.set('DEFAULT', 'allow_dcut', "0") config.set('DEFAULT', 'distributions', "") config.set('DEFAULT', 'allowed_distributions', "") config.set('DEFAULT', 'run_lintian', "0") config.set('DEFAULT', 'run_dinstall', "0") config.set('DEFAULT', 'check_version', "0") config.set('DEFAULT', 'scp_compress', "0") config.set('DEFAULT', 'default_host_main', "") config.set('DEFAULT', 'post_upload_command', "") config.set('DEFAULT', 'pre_upload_command', "") config.set('DEFAULT', 'ssh_config_options', "") config.set('DEFAULT', 'passive_ftp', "1") config.set('DEFAULT', 'progress_indicator', "0") for config_file in config_files: if debug: sys.stdout.write( "D: Parsing configuration file ???{path}???\n".format( path=config_file.name)) try: config.read_file(config_file) except configparser.ParsingError as e: sys.stderr.write(textwrap.dedent("""\ Error parsing configuration file: {0} """).format(str(e))) sys.exit(1) config_file.close() # only check for fqdn and incoming dir, rest have reasonable defaults error = 0 for section in config.sections(): if config.get(section, 'method') == "local": config.set(section, 'fqdn', "localhost") if ( not config.has_option(section, 'fqdn') and config.get(section, 'method') != "local"): sys.stderr.write( "Config error: {} must have a fqdn set\n".format(section)) error = 1 if not config.has_option(section, 'incoming'): sys.stderr.write(( "Config error: {} must have an incoming directory set\n" ).format(section)) error = 1 if error: sys.exit(1) return config def checksum_test(filename, hash_name): """ Get the hex string for the hash of a file's content. :param filename: Path to the file to read. :param hash_name: Name of the hash to use. :return: The computed hash value, as hexadecimal text. Currently supports md5, sha1. ripemd may come in the future. """ try: file_to_test = open(filename, 'rb') except IOError: sys.stdout.write("Can't open {}\n".format(filename)) sys.exit(1) if hash_name == 'md5': hash_type = md5 else: hash_type = sha1 check_obj = hash_type() while 1: data = file_to_test.read(65536) if len(data) == 0: break check_obj.update(data) file_to_test.close() checksum_text = check_obj.hexdigest() return checksum_text def check_upload_variant(changes, debug): """ Check if this is a binary_upload only or not. """ binary_upload = 0 if 'architecture' in changes: arch = changes['architecture'] if debug: sys.stdout.write("D: Architecture: {}\n".format(arch)) if arch.find('source') < 0: if debug: sys.stdout.write("D: Doing a binary upload only.\n") binary_upload = 1 return binary_upload def verify_signature( host, changes_file_path, dsc_file_path, config, check_only, allow_unsigned_uploads, binary_upload, debug): """ Check the signature on the two files given via function call. :param host: Configuration host name. :param changes_file_path: Filesystem path of upload control file. :param dsc_file_path: Filesystem path of source control file. :param config: `ConfigParser` instance for this application. :param check_only: If true, no upload is requested. :param allow_unsigned_uploads: If true, allow an unsigned upload. :param binary_upload: If true, this upload excludes source. :param debug: If true, enable debugging output. :return: ``None``. """ def assert_good_signature_or_exit(path): """ Assert the signature on the file at `path` is good. """ try: with open(path, encoding='utf-8') as infile: crypto.check_file_signature(infile) except Exception as exc: if isinstance(exc, crypto.gpg.errors.GPGMEError): sys.stdout.write("{}\n".format(exc)) sys.exit(1) else: raise if debug: sys.stdout.write( "D: upload control file: {}\n".format(changes_file_path)) sys.stdout.write( "D: source control file: {}\n".format(dsc_file_path)) if ((check_only or config.getboolean(host, 'allow_unsigned_uploads') == 0) and not allow_unsigned_uploads): sys.stdout.write("Checking signature on .changes\n") assert_good_signature_or_exit(changes_file_path) if not binary_upload: sys.stdout.write("Checking signature on .dsc\n") assert_good_signature_or_exit(dsc_file_path) # Regular expression to match the components of a Debian package version # string. # Documentation: Debian Policy ??5.6.12. ???Version???: # The format is: ???[epoch:]upstream_version[-debian_revision]???. version_regex = re.compile( "^" # Epoch is an unsigned decimal integer. "((?P<epoch>[0-9]+):)?" # Upstream version is either followed by a hyphen (implying a Debian # revision), or there are no hyphens (implying a native package). "(?P<upstream_version>(?:[0-9a-zA-Z.~+-]+(?=-)|[0-9a-zA-Z.~+]+))" # The Debian revision is separated by the last hyphen in the string. "(-(?P<debian_version>[0-9a-zA-Z.~+]+))?" "$") def source_check(changes, debug): """ Check if a source tarball has to be included in the package or not. """ include_orig = include_tar = 0 if 'version' in changes: version = changes['version'] if debug: sys.stdout.write("D: Package Version: {}\n".format(version)) version_parts = version_regex.match(version) if version_parts is None: raise dputhelper.UploadChangesInvalidVersionDputError(version) (epoch, upstream_version, debian_version) = ( version_parts.group('epoch'), version_parts.group('upstream_version'), version_parts.group('debian_version')) if (debian_version is None): # The version string implies this is a Debian native package. include_tar = 1 else: if (epoch is not None): if debug: sys.stdout.write("D: Epoch found\n") if debug: sys.stdout.write( "D: Upstream Version: {}\n".format(upstream_version)) sys.stdout.write( "D: Debian Version: {}\n".format(debian_version)) if debian_version in ["0.1", "1", "1.1"]: include_orig = 1 else: include_tar = 1 return (include_orig, include_tar) def verify_files( changes_file_directory, changes_file_name, host, config, check_only, check_version, allow_unsigned_uploads, debug): """ Run some tests on the files to verify that they are in good shape. :param changes_file_directory: Directory path of the upload control file. :param changes_file_name: Filename of the upload control file. :param host: Configuration host name. :param config: `ConfigParser` instance for this application. :param check_only: If true, no upload is requested. :param check_version: If true, check the package version before upload. :param allow_unsigned_uploads: If true, allow an unsigned upload. :param debug: If true, enable debugging output. :return: A collection of filesystem paths of all files to upload. """ file_seen = include_orig_tar_gz = include_tar_gz = binary_only = 0 files_to_upload = [] changes_file_path = os.path.join(changes_file_directory, changes_file_name) if debug: sys.stdout.write( "D: Validating contents of changes file {}\n".format( changes_file_path)) try: changes_file = open(changes_file_path, 'r', encoding='UTF-8') except IOError: sys.stdout.write("Can't open {}\n".format(changes_file_path)) sys.exit(1) changes = parse_changes(changes_file) changes_file.close # Find out if it's a binary only upload or not binary_upload = check_upload_variant(changes, debug) if binary_upload: dsc_file_path = "" else: dsc_file_path = None for file_spec in changes['files'].strip().split("\n"): # Filename only. file_name = file_spec.split()[4] if file_name.find(".dsc") != -1: if debug: sys.stdout.write("D: dsc-File: {}\n".format(file_name)) dsc_file_path = os.path.join(changes_file_directory, file_name) if not dsc_file_path: sys.stderr.write("Error: no dsc file found in sourceful upload\n") sys.exit(1) # Run the check to verify that the package has been tested. try: if config.getboolean(host, 'check_version') == 1 or check_version: version_check(changes_file_directory, changes, debug) except configparser.NoSectionError as e: sys.stderr.write("Error in config file:\n{}\n".format(e)) sys.exit(1) # Verify the signature of the maintainer verify_signature( host, changes_file_path, dsc_file_path, config, check_only, allow_unsigned_uploads, binary_upload, debug) # Check the sources (include_orig_tar_gz, include_tar_gz) = source_check(changes, debug) # Check md5sum and the size file_list = changes['files'].strip().split("\n") hash_name = config.get('DEFAULT', 'hash') for file_spec in file_list: (check_sum, size, section, priority, file_name) = file_spec.split() file_path = os.path.join(changes_file_directory, file_name) if debug: sys.stdout.write("D: File to upload: {}\n".format(file_path)) if checksum_test(file_path, hash_name) != check_sum: if debug: sys.stdout.write( "D: Checksum from .changes: {}\n".format(check_sum)) sys.stdout.write( "D: Generated Checksum: {}\n".format( checksum_test(file_path, hash_name))) sys.stdout.write( "Checksum doesn't match for {}\n".format(file_path)) sys.exit(1) else: if debug: sys.stdout.write( "D: Checksum for {} is fine\n".format(file_path)) if os.stat(file_path)[stat.ST_SIZE] != int(size): if debug: sys.stdout.write("D: size from .changes: {}\n".format(size)) sys.stdout.write( "D: calculated size: {}\n".format( os.stat(file_path)[stat.ST_SIZE])) sys.stdout.write( "size doesn't match for {}\n".format(file_path)) files_to_upload.append(file_path) # Check filenames. for file_path in files_to_upload: if (file_path.endswith(".orig.tar.gz") and not include_orig_tar_gz): if debug: sys.stdout.write("D: Filename: {}\n".format(file_path)) sys.stdout.write("D: Suffix: {}\n\n".format(file_path[-12:])) sys.stdout.write( "Package includes an .orig.tar.gz file although" " the debian revision suggests\n" "that it might not be required.\n") elif ( file_path.endswith(".tar.gz") and not include_tar_gz and not include_orig_tar_gz): if debug: sys.stdout.write("D: Filename: {}\n".format(file_path)) sys.stdout.write("D: Suffix: {}\n".format(file_path[-7:])) sys.stdout.write( "Package includes a .tar.gz file although" " the version suggests that it might\n" "not be required.\n") distribution = changes.get('distribution') allowed_distributions = config.get(host, 'allowed_distributions') if distribution and allowed_distributions: if debug: sys.stdout.write(( "D: Checking: distribution {distribution}" " matches {allowed_distributions}\n" ).format(**vars())) if not re.match(allowed_distributions, distribution): raise dputhelper.DputUploadFatalException(( "Error: uploading files" " for distribution {distribution} to {host} not allowed." ).format(**vars())) if debug: sys.stdout.write("D: File to upload: {}\n".format(changes_file_path)) files_to_upload.append(changes_file_path) return files_to_upload def print_config(config, debug): """ Print the configuration and exit. """ sys.stdout.write("\n") config.write(sys.stdout) sys.stdout.write("\n") def print_default_upload_method(config): """ Print the default upload method defined in the configuration. :param config: `ConfigParser` instance for this application. :return: ``None``. """ sys.stdout.write(textwrap.dedent("""\ Default Method: {} """.format(config.get('DEFAULT', 'method')))) def print_host_list(config): """ Print a list of hosts defined in the configuration. :param config: `ConfigParser` instance for this application. :return: ``None``. """ for section in config.sections(): distributions = "" if config.get(section, 'distributions'): distributions = ", distributions: {}".format( config.get(section, 'distributions')) sys.stdout.write(( "{section} => {fqdn}" " (Upload method: {method}{distributions})\n" ).format( section=section, fqdn=config.get(section, 'fqdn'), method=config.get(section, 'method'), distributions=distributions)) sys.stdout.write("\n") def create_upload_file( changes_file_name, host, fqdn, changes_file_directory, files_to_upload, debug): """ Write the log file for the upload. :param changes_file_name: File name of package to upload. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of the remote host. :param changes_file_directory: Directory path of the upload control file. :param debug: If true, enable debugging output. :return: ``None``. The upload log file is named ???basename.hostname.upload???, where ???basename??? is the package file name without suffix, and ???hostname??? is the name of the host as specified in the configuration file. For example, uploading ???foo_1.2.3-1_xyz.deb??? to host ???bar??? will be logged to ???foo_1.2.3-1_xyz.bar.upload???. The upload log file is written to the directory containing the upload control file. """ # only need first part changes_file_name_base = os.path.splitext(changes_file_name)[0] log_file_name = "{name_stem}.{host}.upload".format( name_stem=changes_file_name_base, **vars()) log_file_path = os.path.join(changes_file_directory, log_file_name) if debug: sys.stdout.write("D: Writing logfile: {}\n".format(log_file_path)) try: if os.access(log_file_path, os.R_OK): log_file = open(log_file_path, 'a', encoding='UTF-8') else: log_file = open(log_file_path, 'w', encoding='UTF-8') except IOError: sys.stderr.write("Could not write {}\n".format(log_file_path)) sys.exit(1) for file_path in files_to_upload: log_entry = ( "Successfully uploaded {file_name} to {fqdn} for {host}.\n" ).format( file_name=os.path.basename(file_path), **vars()) log_file.write(log_entry) log_file.close() def run_lintian_test(changes_file): """ Run lintian on the changes file and stop if it finds errors. """ if os.access(changes_file, os.R_OK): if os.access("/usr/bin/lintian", os.R_OK): old_signal = signal.signal(signal.SIGPIPE, signal.SIG_DFL) sys.stdout.write("Package is now being checked with lintian.\n") if dputhelper.check_call( ["lintian", "-i", changes_file] ) != dputhelper.EXIT_STATUS_SUCCESS: sys.stdout.write( "\n" "Lintian says this package is not compliant" " with the current policy.\n" "Please check the current policy and your package.\n" "Also see lintian documentation about overrides.\n") sys.exit(1) else: signal.signal(signal.SIGPIPE, old_signal) return 0 else: sys.stdout.write( "lintian is not installed, skipping package test.\n") else: sys.stdout.write("Can't read {}\n".format(changes_file)) sys.exit(1) def guess_upload_host(changes_file_directory, changes_file_name, config): """ Guess the host where the package should be uploaded to. :param changes_file_directory: Directory path of the upload control file. :param changes_file_name: Filename of the upload control file. :param config: `ConfigParser` instance for this application. :return: The hostname determined for this upload. This is based on information from the upload control (???*.changes???) file. """ non_us = 0 distribution = "" dist_re = re.compile(r'^Distribution: (.*)') changes_file_path = os.path.join(changes_file_directory, changes_file_name) try: changes_file = open(changes_file_path, 'r', encoding='UTF-8') except IOError: sys.stdout.write("Can't open {}\n".format(changes_file_path)) sys.exit(1) lines = changes_file.readlines() for line in lines: match = dist_re.search(line) if match: distribution = match.group(1) # Try to guess a host based on the Distribution: field if distribution: for section in config.sections(): host_dists = config.get(section, 'distributions') if not host_dists: continue for host_dist in host_dists.split(","): if distribution == host_dist.strip(): if debug: sys.stdout.write(( "D: guessing host {section}" " based on distribution {host_dist}\n" ).format(**vars())) return section if len(config.get('DEFAULT', 'default_host_main')) != 0: sys.stdout.write( "Trying to upload package to {}\n".format( config.get('DEFAULT', 'default_host_main'))) return config.get('DEFAULT', 'default_host_main') else: sys.stdout.write( "Trying to upload package to ftp-master" " (ftp.upload.debian.org)\n") return "ftp-master" def get_upload_method_name_for_host(config, host): """ Get the upload method name from the config for the host. :param config: The `ConfigParser` instance for this application. :param host: The remote host for which to get the configuration. :return: The name of the upload method for the specified host. """ method_name = None if not config.get(host, 'method'): method_name = config.get('DEFAULT', 'method') else: method_name = config.get(host, 'method') return method_name def get_login_for_host(config, host, debug=False): """ Get the login username from the config for the host. :param config: The `ConfigParser` instance for this application. :param host: The remote host for which to get the configuration. :param debug: If true, enable debugging output. :return: The username for the specified host, if it is configured; otherwise, the system username. """ login = None if ( config.has_option(host, 'login') and config.get(host, 'login') != 'username'): login = config.get(host, 'login') if debug: sys.stdout.write( "D: Login {login} from section {host} used\n".format( **vars())) elif ( config.has_option('DEFAULT', 'login') and config.get('DEFAULT', 'login') != 'username'): login = config.get('DEFAULT', 'login') if debug: sys.stdout.write("D: Default login {login} used\n".format( **vars())) else: login = dputhelper.get_username_from_system(debug=debug) sys.stdout.write(( "D: Neither host {host} nor default login used." " Using {login}\n" ).format(**vars())) return login def get_fqdn_for_host(config, host): """ Get the FQDN to use, determined by the `host` config section. :param config: The `ConfigParser` instance for this application. :param host: The remote host for which to get the configuration. :return: The fully qualified domain name (FQDN) for the specified host. As a special case, if the host configuration's 'method' option specifies the value "local", the corresponding FQDN is always "localhost". Otherwise, the FQDN value is the host configuration's 'fqdn' option value. """ if config.get(host, 'method') == "local": fqdn = "localhost" else: config_fqdn = config.get(host, 'fqdn') if ":" in config_fqdn: # The value includes a port specification we don't want. (fqdn, __) = config_fqdn.split(":", 1) else: fqdn = config_fqdn return fqdn def get_port_for_host(config, host): """ Get the port to use, determined by the `host` config section. :param config: The `ConfigParser` instance for this application. :param host: The remote host for which to get the configuration. :return: The TCP port if specified, otherwise ``None``. If the host config specified a 'fqdn' value of the form `"example.com:port"`, the `port` value is returned. Otherwise, ``None``. """ port_text = None if config.get(host, 'method') != "local": config_fqdn = config.get(host, 'fqdn') if ":" in config_fqdn: (__, port_text) = config_fqdn.split(":", 1) port = int(port_text) if port_text else None return port def get_incoming_for_host(config, host): """ Get the incoming directory to use, determined by the host config. :param config: The `ConfigParser` instance for this application. :param host: The remote host for which to get the configuration. :return: The incoming directory path for the remote host. """ config_incoming = config.get(host, 'incoming') result = config_incoming.rstrip(os.path.sep) return result def get_delayed_days(days_text): """ Get the number of days specified for delayed upload. :param days_text: The text specifying number of days to delay. :return: The number of days as an integer. If the specified `days_text` is invalid, a `ValueError` is raised. """ result = None if days_text is None: return result try: days = int(days_text) except ValueError as exc: raise ValueError( "delayed days value must be a decimal integer: " "{}".format(days_text) ) from exc if days not in range(16): raise ValueError( "delayed days value must be from 0 to 15: " "{:d}".format(days)) result = days return result def get_progress_indicator_for_host(config, host): """ Get the code for the progress indicator to use for this upload. """ result = config.getint(host, 'progress_indicator') if not os.isatty(1): result = 0 return result def verify_config_upload_methods(config, host, upload_methods, debug=False): """ Verify the configured upload method for host and default. :param config: The `ConfigParser` instance for this application. :param host: The remote host for which to get the configuration. :param upload_methods: The mapping from upload method name to function. :param debug: If true, enable debugging output. :return: ``None``. If the configured upload method for the host or for default do not verify, the program exits. """ if debug: sys.stdout.write( "D: Default Method: {}\n".format( config.get('DEFAULT', 'method'))) if config.get('DEFAULT', 'method') not in upload_methods: sys.stdout.write( "Unknown upload method: {}\n".format( config.get('DEFAULT', 'method'))) sys.exit(1) if debug: sys.stdout.write( "D: Host Method: {}\n".format(config.get(host, 'method'))) if config.get(host, 'method') not in upload_methods: sys.stdout.write( "Unknown upload method: {}\n".format( config.get(host, 'method'))) sys.exit(1) def dinstall_caller(filename, host, fqdn, login, incoming, debug): """ Run ???dinstall??? for the package on the remote host. :param filename: Debian package filename to install. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of the remote host. :param login: Username for login to the remote host. :param incoming: Filesystem path on remote host for incoming packages. :param debug: If true, enable debugging output. :return: ``None``. Run ???dinstall??? on the remote host in test mode, and present the output to the user. This is so the user can see if the package would be installed or not. """ command = [ "ssh", "{login}@{fqdn}".format(**vars()), "cd", str(incoming), ";", "dinstall", "-n", str(filename)] if debug: sys.stdout.write( "D: Logging into {login}@{host}:{incoming}\n".format(**vars())) sys.stdout.write("D: dinstall -n {}\n".format(filename)) if dputhelper.check_call(command) != dputhelper.EXIT_STATUS_SUCCESS: sys.stdout.write( "Error occured while trying to connect, or while" " attempting to run dinstall.\n") sys.exit(1) def version_check(path, changes, debug): """ Check if the caller has installed the package also on his system. This is for testing purposes before uploading it. If not, we reject the upload. """ packages_to_check = [] # Get arch dpkg_proc = subprocess.Popen( "dpkg --print-architecture", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) dpkg_stdout = dputhelper.make_text_stream(dpkg_proc.stdout) dpkg_stderr = dputhelper.make_text_stream(dpkg_proc.stderr) dpkg_output = dpkg_stdout.read() dpkg_architecture = dpkg_output.strip() dpkg_stdout.close() dpkg_stderr_output = dpkg_stderr.read() dpkg_stderr.close() if debug and dpkg_stderr_output: sys.stdout.write(( "D: dpkg-architecture stderr output:" " {!r}\n").format(dpkg_stderr_output)) if debug: sys.stdout.write( "D: detected architecture: '{}'\n".format(dpkg_architecture)) # Get filenames of deb files: for file_spec in changes['files'].strip().split("\n"): file_name = os.path.join(path, file_spec.split()[4]) if file_name.endswith(".deb"): if debug: sys.stdout.write("D: Debian Package: {}\n".format(file_name)) dpkg_proc = subprocess.Popen( "dpkg --field {}".format(file_name), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) dpkg_stdout = dputhelper.make_text_stream(dpkg_proc.stdout) dpkg_stderr = dputhelper.make_text_stream(dpkg_proc.stderr) dpkg_output = dpkg_stdout.read() dpkg_stdout.close() dpkg_fields = email.parser.HeaderParser().parsestr(dpkg_output) dpkg_stderr_output = dpkg_stderr.read() dpkg_stderr.close() if debug and dpkg_stderr_output: sys.stdout.write(( "D: dpkg stderr output:" " {!r}\n").format(dpkg_stderr_output)) if ( dpkg_architecture and dpkg_fields['architecture'] not in [ 'all', dpkg_architecture]): if debug: sys.stdout.write(( "D: not install-checking {} due to arch mismatch\n" ).format(file_name)) else: package_name = dpkg_fields['package'] version_number = dpkg_fields['version'] if debug: sys.stdout.write( "D: Package to Check: {}\n".format(package_name)) if debug: sys.stdout.write( "D: Version to Check: {}\n".format(version_number)) packages_to_check.append((package_name, version_number)) for package_name, version_to_check in packages_to_check: if debug: sys.stdout.write("D: Name of Package: {}\n".format(package_name)) dpkg_proc = subprocess.Popen( "dpkg -s {}".format(package_name), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) dpkg_stdout = dputhelper.make_text_stream(dpkg_proc.stdout) dpkg_stderr = dputhelper.make_text_stream(dpkg_proc.stderr) dpkg_output = dpkg_stdout.read() dpkg_stdout.close() dpkg_fields = email.parser.HeaderParser().parsestr(dpkg_output) dpkg_stderr_output = dpkg_stderr.read() dpkg_stderr.close() if debug and dpkg_stderr_output: sys.stdout.write(( "D: dpkg stderr output:" " {!r}\n").format(dpkg_stderr_output)) if 'version' in dpkg_fields: installed_version = dpkg_fields['version'] if debug: sys.stdout.write( "D: Installed-Version: {}\n".format(installed_version)) if debug: sys.stdout.write( "D: Check-Version: {}\n".format(version_to_check)) if installed_version != version_to_check: sys.stdout.write( "Package to upload is not installed, but it appears" " you have an older version installed.\n") else: sys.stdout.write( "Uninstalled Package. Test it before uploading it.\n") sys.exit(1) def execute_command(command, position, debug=False): """ Run `command` in a child shell. :param command: Command line to execute. :param position: Position of the command: 'pre' or 'post'. :param debug: If true, enable debugging output. :return: ``None``. """ if debug: sys.stdout.write("D: Command: {}\n".format(command)) if subprocess.call(command, shell=True): raise dputhelper.DputUploadFatalException( "Error: {} upload command failed.".format(position)) def check_upload_logfile( changes_file_path, host, fqdn, check_only, run_lintian, force_upload, debug): """ Check if the user already put this package on the specified host. :param changes_file_path: Filesystem path of upload control file. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of the remote host. :param check_only: If true, no upload is requested. :param run_lintian: If true, a Lintian invocation is requested. :param force_upload: If true, don't check the upload log file. :param debug: If true, enable debugging output. :return: ``None``. """ uploaded = 0 upload_logfile_path = "{filename_stem}.{host}.upload".format( filename_stem=(os.path.splitext(changes_file_path)[0]), **vars()) if not check_only and not force_upload: if not os.path.exists(upload_logfile_path): return try: upload_logfile = open(upload_logfile_path) except IOError: sys.stderr.write("Couldn't open {}\n".format(upload_logfile_path)) sys.exit(1) for line in upload_logfile.readlines(): if line.find(fqdn) != -1: uploaded = 1 if uploaded: sys.stderr.write(textwrap.dedent("""\ Package has already been uploaded to {host} on {fqdn}, recorded in upload log file ???{upload_logfile_path}???. Not attempting upload. """).format(**vars())) sys.stderr.write(textwrap.dedent("""\ To ignore this check and attempt upload anyway, use the ???--force??? option. """)) sys.stderr.write( "Nothing more to do for {}\n".format(changes_file_path)) sys.exit(os.EX_OK) def make_usage_message(): """ Make the program usage help message. """ text = textwrap.dedent("""\ Usage: dput [options] [host] <package(s).changes> Supported options (see man page for long forms): -c: Config file to parse. -d: Enable debug messages. -D: Run dinstall after upload. -e: Upload to a delayed queue. Takes an argument from 0 to 15. -f: Force an upload. -h: Display this help message. -H: Display a list of hosts from the config file. -l: Run lintian before upload. -U: Do not write a .upload file after uploading. -o: Only check the package. -p: Print the configuration. -P: Use passive mode for ftp uploads. -s: Simulate the upload only. -u: Don't check GnuPG signature. -v: Display version information. -V: Check the package version and then upload it. """) return text def make_delayed_queue_path(days=None): """ Make the relative DELAYED queue path for the specified delay. :param days: Number of days (integer). :return: The full queue name, if `days` is a number; otherwise, the empty string. """ queue_fullname = "" if days is not None: queue_name = "{days:d}-day".format(days=days) queue_fullname = "DELAYED/{queue}".format(queue=queue_name) return queue_fullname def upload_files_via_simulate( method, config, host, fqdn, login, incoming, files_to_upload, debug=False): """ Simulate the upload of files. :param method: Name of requested upload method. :param config: `ConfigParser` instance for this application. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of remote host. :param login: Username to use for login to remote host. :param incoming: Directory on remote host for incoming files. :param files_to_upload: Collection of file names to upload. :param debug: If true, enable debugging output. :return: ``None``. The upload will not actually be performed, but diagnostic messages will be emitted. """ sys.stderr.write( "Not performing upload: ???simulate??? option specified.\n") for file_path in files_to_upload: sys.stderr.write(( "Uploading with {method}: {file_path}" " to {fqdn}:{incoming}\n" ).format(**vars())) def upload_files_via_method_local( method_name, method, config, host, fqdn, login, incoming, files_to_upload, debug, compress=False, progress=0): """ Upload files via ???local??? method. :param method_name: Name of requested upload method. :param method: Callable upload method to invoke. :param config: `ConfigParser` instance for this application. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of remote host. :param login: Username to use for login to remote host. :param incoming: Directory on remote host for incoming files. :param files_to_upload: Collection of file names to upload. :param debug: If true, enable debugging output. :param compress: Legacy dummy parameter. :param progress: Code specifying progress indicator to use. :return: ``None``. """ method( fqdn, login, incoming, files_to_upload, debug, compress, progress) def upload_files_via_method_ftp( method_name, method, config, host, fqdn, login, incoming, files_to_upload, debug, commandline_passive=None, progress=0): """ Upload files via ???ftp??? method. :param method_name: Name of requested upload method. :param method: Callable upload method to invoke. :param config: `ConfigParser` instance for this application. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of remote host. :param login: Username to use for login to remote host. :param incoming: Directory on remote host for incoming files. :param files_to_upload: Collection of file names to upload. :param debug: If true, enable debugging output. :param commandline_passive: Boolean value specifying whether the command line option "--passive" was received. :param progress: Code specifying progress indicator to use. :return: ``None``. """ port = get_port_for_host(config, host) if not port: port = 21 ftp_passive = config.getboolean(host, 'passive_ftp') if commandline_passive is not None: ftp_passive = commandline_passive ftp_mode_name = ["active", "passive"][ftp_passive] if debug: sys.stdout.write("D: FTP port: {port:d}\n".format(port=port)) sys.stdout.write("D: Using {mode} ftp\n".format(mode=ftp_mode_name)) method( fqdn, login, incoming, files_to_upload, debug, ftp_mode=ftp_passive, progress=progress, port=port) def upload_files_via_method_scp( method_name, method, config, host, fqdn, login, incoming, files_to_upload, debug, compress=False, ssh_config_options=""): """ Upload files via ???scp??? method. :param method_name: Name of requested upload method. :param method: Callable upload method to invoke. :param config: `ConfigParser` instance for this application. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of remote host. :param login: Username to use for login to remote host. :param incoming: Directory on remote host for incoming files. :param files_to_upload: Collection of file names to upload. :param debug: If true, enable debugging output. :param compress: Boolean value specifying whether to enable SSH compression. :param ssh_config_options: Space-separated SSH configuration options to pass on SSH command line. :return: ``None``. """ ssh_options = [ option for option in ( line.strip() for line in ssh_config_options.split("\n")) if option] if debug: if compress: sys.stdout.write("D: Setting compression for scp\n") sys.stdout.write("\n ".join( ["D: ssh config options:", ""] + ssh_options + [""])) method( fqdn, login, incoming, files_to_upload, debug, compress=compress, ssh_config_options=ssh_options) def upload_files_via_method( method_name, method, config, host, fqdn, login, incoming, files_to_upload, debug, dummy=0, progress=0): """ Upload files via specified method. :param method_name: Name of requested upload method. :param method: Callable upload method to invoke. :param config: `ConfigParser` instance for this application. :param host: Configuration host name. :param fqdn: Fully-qualified domain name of remote host. :param login: Username to use for login to remote host. :param incoming: Directory on remote host for incoming files. :param files_to_upload: Collection of file names to upload. :param debug: If true, enable debugging output. :param dummy: Legacy dummy parameter. :param progress: Code specifying progress indicator to use. :return: ``None``. """ method( fqdn, login, incoming, files_to_upload, debug, dummy=dummy, progress=progress) def upload_files( upload_methods, config, host, files_to_upload, simulate=False, delay_days=None, commandline_passive=None, debug=False): """ Upload files to the host. :param upload_methods: Mapping of {method_name: callable}. :param config: `ConfigParser` instance for this application. :param host: Configuration host name. :param files_to_upload: Collection of file names to upload. :param simulate: If true, simulate the upload only. :param delay_days: Integer number of days for incoming delay queue. :param commandline_passive: If true, enable FTP passive mode. :param debug: If true, enable debugging output. :return: ``None``. Upload the specified files to the host, using the method and credentials from the configuration for the host. """ method = config.get(host, 'method') fqdn = get_fqdn_for_host(config, host) login = get_login_for_host(config, host, debug=debug) incoming = get_incoming_for_host(config, host) delayed_queue = make_delayed_queue_path(days=delay_days) if delayed_queue: incoming = os.path.join(incoming, delayed_queue) progress = get_progress_indicator_for_host(config, host) if not simulate: destination_description = host if delayed_queue: destination_description += " [{queue}]".format( queue=delayed_queue) sys.stdout.write(( "Uploading to {destination}" " (via {method} to {fqdn}):\n" ).format( destination=destination_description, **vars())) if debug: sys.stdout.write("D: FQDN: {}\n".format(fqdn)) sys.stdout.write("D: Login: {}\n".format(login)) sys.stdout.write("D: Incoming: {}\n".format(incoming)) if method == 'local': upload_func = upload_files_via_method_local upload_func_kwargs = { 'compress': 0, 'progress': progress, } elif method == 'ftp': upload_func = upload_files_via_method_ftp upload_func_kwargs = { 'commandline_passive': commandline_passive, 'progress': progress, } elif method == 'scp': scp_compress = config.getboolean(host, 'scp_compress') ssh_config_options = [ option_text for option_text in ( line.strip() for line in config.get(host, 'ssh_config_options').split("\n")) if option_text] upload_func = upload_files_via_method_scp upload_func_kwargs = { 'compress': scp_compress, 'ssh_config_options': ssh_config_options, } else: upload_func = upload_files_via_method upload_func_kwargs = { 'dummy': 0, 'progress': progress, } upload_func( method, upload_methods[method], config, host, fqdn, login, incoming, files_to_upload, debug, **upload_func_kwargs) else: upload_files_via_simulate( method, config, host, fqdn, login, incoming, files_to_upload, debug=debug) def parse_commandline(argv): """ Parse the command-line arguments to option values. :param argv: The command-line arguments, as a sequence of text strings. :return: A 2-tuple (`options`, `args`), where `options` is the collection of option values; `args` is the remaining arguments after removing all options. The `options` collection is a `SimpleNamespace` instance, with each value represented as a named attribute. """ global debug options = types.SimpleNamespace() for (name, value) in parse_commandline.options_defaults.items(): setattr(options, name, value) progname = dputhelper.get_progname(argv) version = dputhelper.get_distribution_version() # Parse Command Line Options. try: (opts, args) = dputhelper.getopt( argv[1:], "c:dDe:fhHlUopPsuvV", [ "debug", "dinstall", "check-only", "check-version", "config=", "force", "help", "host-list", "lintian", "no-upload-log", "passive", "print", "simulate", "unchecked", "delayed=", "version"]) except dputhelper.DputException as exc: sys.stderr.write("{}\n".format(exc)) sys.exit(os.EX_USAGE) # Default to requiring at least one positional argument. positional_args_expected = True for option, arg in opts: if option in ("-h", "--help"): sys.stdout.write(make_usage_message()) sys.exit(os.EX_OK) elif option in ("-v", "--version"): sys.stdout.write("{progname} {version}\n".format( progname=progname, version=version)) sys.exit(os.EX_OK) elif option in ("-d", "--debug"): debug = True options.debug = True elif option in ("-D", "--dinstall"): options.run_dinstall = True elif option in ("-c", "--config"): options.config_file_path = arg elif option in ("-f", "--force"): options.force_upload = True elif option in ("-H", "--host-list"): options.config_host_list = True positional_args_expected = False elif option in ("-l", "--lintian"): options.run_lintian = True elif option in ("-U", "--no-upload-log"): options.upload_log = False elif option in ("-o", "--check-only"): options.check_only = True elif option in ("-p", "--print"): options.config_print = True positional_args_expected = False elif option in ("-P", "--passive"): options.commandline_passive = True elif option in ("-s", "--simulate"): options.simulate = True elif option in ("-u", "--unchecked"): options.allow_unsigned_uploads = True elif option in ("-e", "--delayed"): options.delay_days_text = arg elif option in ("-V", "--check-version"): options.check_version = True # Legacy mis-spelled option name. elif option == "--check_version": options.check_version = True if ((len(args) < 1) and positional_args_expected): sys.stdout.write( "No package or host has been provided, see dput -h\n") sys.exit(os.EX_USAGE) return (options, args) parse_commandline.options_defaults = { 'debug': debug, 'check_version': False, 'config_print': False, 'force_upload': False, 'run_lintian': False, 'config_host_list': False, 'commandline_passive': None, 'preferred_host': "", 'config_file_path': "", 'run_dinstall': False, 'upload_log': True, 'check_only': False, 'allow_unsigned_uploads': False, 'delay_days_text': None, 'simulate': False, } def main(): """ Mainline function for this program. """ progname = dputhelper.get_progname() version = dputhelper.get_distribution_version() # Emit the program version in the debug output. if debug: sys.stdout.write( "D: {progname} {version}\n".format( progname=progname, version=version)) try: (options, args) = parse_commandline(sys.argv) except SystemExit as exc: return exc.code login = dputhelper.get_username_from_system(debug=debug) # Open and parse all relevant configuration files. try: config_files = list(configfile.active_config_files( options.config_file_path, options.debug)) config = read_configs(config_files, options.debug) except configfile.ConfigurationError as exc: sys.stderr.write("Error: {0}\n".format(str(exc))) sys.stderr.write("Failed to read program configuration; aborting.\n") sys.exit(1) if options.config_print: print_config(config, options.debug) sys.exit(os.EX_OK) if options.config_host_list: print_default_upload_method(config) print_host_list(config) sys.exit(os.EX_OK) # Process further command line options. if len(args) == 1 and not options.check_only: changes_file_paths = args[0:] else: if not options.check_only: if options.debug: sys.stdout.write( "D: Checking if a host was named" " on the command line.\n") if config.has_section(args[0]): if options.debug: sys.stdout.write( "D: Host {} found in config.\n".format(args[0])) # Host was also named, so only the rest will be a list # of packages to upload. options.preferred_host = args[0] changes_file_paths = args[1:] elif ( not config.has_section(args[0]) and not args[0].endswith(".changes")): sys.stderr.write( "No host {} found in config.\n".format(args[0])) sys.exit(1) else: if options.debug: sys.stdout.write("D: No host named on command line.\n") # Only packages have been named on the command line. options.preferred_host = "" changes_file_paths = args[0:] else: if options.debug: sys.stdout.write("D: Checking for the package name.\n") if config.has_section(args[0]): sys.stdout.write( "D: Host {} found in config.\n".format(args[0])) options.preferred_host = args[0] changes_file_paths = args[1:] else: sys.stderr.write( "No host {} found in config.\n".format(args[0])) if options.debug: sys.stdout.write("D: No host named on command line.\n") changes_file_paths = args[0:] upload_methods = import_upload_functions() # Run the same checks for all packages that have been given on # the command line for changes_file_path in changes_file_paths: # Check that a .changes file was given on the command line # and no matching .upload file exists. if (not changes_file_path.endswith(".changes")): sys.stderr.write( "Filename does not match ???*.changes???: {}\n".format( changes_file_path)) sys.exit(1) # Construct the package name for further usage. changes_file_directory, changes_file_name = os.path.split( changes_file_path) if changes_file_directory == "": changes_file_directory = os.getcwd() # Define the host to upload to. if options.preferred_host: host = options.preferred_host else: host = guess_upload_host( changes_file_directory, changes_file_name, config) fqdn = get_fqdn_for_host(config, host) # Check if we already did this upload or not. check_upload_logfile( changes_file_path, host, fqdn, options.check_only, options.run_lintian, options.force_upload, options.debug) # Run the change file tests. files_to_upload = verify_files( changes_file_directory, changes_file_name, host, config, options.check_only, options.check_version, options.allow_unsigned_uploads, options.debug) # Run the lintian test if the user asked us to do so. if ( options.run_lintian or config.getboolean(host, 'run_lintian') == 1): run_lintian_test( os.path.join(changes_file_directory, changes_file_name)) # Don't upload, skip to the next item. if options.check_only: sys.stdout.write("Package checked by dput.\n") continue # Pre-Upload Commands if len(config.get(host, 'pre_upload_command')) != 0: position = 'pre' command = config.get(host, 'pre_upload_command') execute_command(command, position, options.debug) verify_config_upload_methods( config, host, upload_methods, debug=debug) login = get_login_for_host(config, host, debug=debug) incoming = get_incoming_for_host(config, host) if options.delay_days_text is None and config.has_option(host, 'delayed'): # Command-line option for delay days not specified. options.delay_days_text = config.get(host, 'delayed') try: options.delay_days = get_delayed_days(options.delay_days_text) except ValueError as exc: sys.stdout.write( "Incorrect delayed argument: {exc_class}: {exc}\n".format( exc_class=type(exc).__name__, exc=exc)) sys.exit(os.EX_USAGE) # Do the actual upload. upload_files( upload_methods, config, host, files_to_upload, simulate=options.simulate, delay_days=options.delay_days, commandline_passive=options.commandline_passive, debug=options.debug) # Create the logfile after the package has # been put into the archive. if not options.simulate: if options.upload_log: create_upload_file( changes_file_name, host, fqdn, changes_file_directory, files_to_upload, options.debug) sys.stdout.write("Successfully uploaded packages.\n") else: sys.stdout.write("Simulated upload.\n") # Run dinstall if the user asked us to do so. if options.debug: sys.stdout.write( "D: run_dinstall: {}\n".format(options.run_dinstall)) sys.stdout.write( "D: Host Config: {}\n".format( config.getboolean(host, 'run_dinstall'))) if ( config.getboolean(host, 'run_dinstall') == 1 or options.run_dinstall): if not options.simulate: dinstall_caller( changes_file_name, host, fqdn, login, incoming, options.debug) else: sys.stderr.write( "Not running ???dinstall???: ???simulate??? option specified.\n") # Post-Upload Command if len(config.get(host, 'post_upload_command')) != 0: position = 'post' command = config.get(host, 'post_upload_command') execute_command(command, position, options.debug) # Copyright ?? 2015???2024 Ben Finney <bign...@debian.org> # Copyright ?? 2008???2013 Y Giridhar Appaji Nag <app...@debian.org> # Copyright ?? 2006???2008 Thomas Viehmann <t...@beamnet.de> # Copyright ?? 2000???2005 Christian Kurz <sho...@debian.org> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # Local variables: # coding: utf-8 # mode: python # End: # vim: fileencoding=utf-8 filetype=python :
signature.asc
Description: This is a digitally signed message part.