| Server IP : 172.67.191.97 / Your IP : 104.23.197.209 Web Server : Apache/2.4.63 (Ubuntu) System : Linux adminpruebas-Virtual-Machine 6.14.0-37-generic #37-Ubuntu SMP PREEMPT_DYNAMIC Fri Nov 14 22:10:32 UTC 2025 x86_64 User : www-data ( 33) PHP Version : 8.4.5 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /usr/bin/X11/X11/X11/ |
Upload File : |
#!/usr/bin/python3
import apt_pkg
import io
import os
import re
import sys
import gettext
import locale
import argparse
import subprocess
import tempfile
from softwareproperties.shortcuthandler import ShortcutException
from softwareproperties.shortcuts import shortcut_handler
from softwareproperties.ppa import PPAShortcutHandler
from softwareproperties.cloudarchive import CloudArchiveShortcutHandler
from softwareproperties.sourceslist import SourcesListShortcutHandler
from softwareproperties.uri import URIShortcutHandler
from softwareproperties.sourceutils import *
from aptsources.sourceslist import SourcesList, SourceEntry
from aptsources.distro import get_distro
from copy import copy
from gettext import gettext as _, ngettext
apt_pkg.init()
SOURCESLIST = apt_pkg.config.find_dir("Dir::Etc::sourceparts") + get_distro().id.lower() + ".sources"
if not os.path.exists(SOURCESLIST):
SOURCESLIST = apt_pkg.config.find_file("Dir::Etc::sourcelist")
class AddAptRepository(object):
def __init__(self):
gettext.textdomain("software-properties")
self.distro = get_distro()
self.sourceslist = SourcesList(deb822=True)
self.distro.get_sources(self.sourceslist)
def parse_args(self, args):
description = "Only ONE of -P, -C, -U, -S, or old-style 'line' can be specified"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-d", "--debug", action="store_true",
help=_("Print debug"))
parser.add_argument("-r", "--remove", action="store_true",
help=_("Disable repository"))
parser.add_argument("-s", "--enable-source", action="count", default=0,
help=_("Allow downloading of the source packages from the repository"))
parser.add_argument("-c", "--component", action="append", default=[],
help=_("Components to use with the repository"))
parser.add_argument("-p", "--pocket",
help=_("Add entry for this pocket"))
parser.add_argument("-y", "--yes", action="store_true",
help=_("Assume yes to all queries"))
parser.add_argument("-n", "--no-update", dest="update", action="store_false",
help=_("Do not update package cache after adding"))
parser.add_argument("-u", "--update", action="store_true", default=True,
help=argparse.SUPPRESS)
parser.add_argument("-l", "--login", action="store_true",
help=_("Login to Launchpad."))
parser.add_argument("--dry-run", action="store_true",
help=_("Don't actually make any changes."))
group = parser.add_mutually_exclusive_group()
group.add_argument("--refresh-keys", action="store_true",
help=_("Refresh signing keys for currently configured PPAs"))
group.add_argument("-L", "--list", action="store_true",
help=_("List currently configured repositories"))
group.add_argument("-P", "--ppa",
help=_("PPA to add"))
group.add_argument("-C", "--cloud",
help=_("Cloud Archive to add"))
group.add_argument("-U", "--uri",
help=_("Archive URI to add"))
group.add_argument("-S", "--sourceslist", nargs='+', default=[],
help=_("Full sources.list entry line to add"))
group.add_argument("line", nargs='*', default=[],
help=_("sources.list line to add (deprecated)"))
self.parser = parser
self.options = self.parser.parse_args(args)
@property
def dry_run(self):
return self.options.dry_run
@property
def enable_source(self):
return self.options.enable_source > 0
@property
def components(self):
return self.options.component
@property
def pocket(self):
if self.options.pocket:
return self.options.pocket.lower()
return None
@property
def source_type(self):
return self.distro.source_type
@property
def binary_type(self):
return self.distro.binary_type
def is_components(self, comps):
if not comps:
return False
return set(comps.split()) <= set([comp.name for comp in self.distro.source_template.components])
def apt_update(self):
if self.options.update and not self.dry_run:
# We prefer to run apt-get update here. The built-in update support
# does not have any progress, and only works for shortcuts. Moving
# it to something like save() and using apt.progress.text would
# solve the problem, but the new errors might cause problems with
# the dbus server or other users of the API. Also, it's unclear
# how good the text progress is or how to pass it best.
subprocess.run(['apt-get', 'update'])
def prompt_user(self):
if self.dry_run:
print(_("DRY-RUN mode: no modifications will be made"))
return
if not self.options.yes and sys.stdin.isatty() and not "FORCE_ADD_APT_REPOSITORY" in os.environ:
try:
input(_("Press [ENTER] to continue or Ctrl-c to cancel."))
except KeyboardInterrupt:
print(_("Aborted."))
sys.exit(1)
def prompt_user_shortcut(self, shortcut):
'''Display more information about the shortcut / ppa info'''
print(_("Repository: '%s'") % shortcut.SourceEntry().line)
if shortcut.description:
# strip ANSI escape sequences
description = re.sub(r"(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]",
"", shortcut.description)
print(_("Description:"))
print(description)
if shortcut.web_link:
print(_("More info: %s") % shortcut.web_link)
if self.options.remove:
print(_("Removing repository."))
else:
print(_("Adding repository."))
self.prompt_user()
def _add_components(self, sources):
components = {}
for entry in sources:
key = (entry.uri, entry.dist, entry.type)
if key in components:
components[key] += entry.comps
else:
components[key] = copy(entry.comps)
for (uri, dist, type), comps in components.items():
added = set(self.components) - set(comps)
if added:
comps = list(set(comps) | set(self.components))
entry = self.sourceslist.add(
type=type,
dist=dist,
uri=uri,
orig_comps=comps,
file=SOURCESLIST,
comment="Added by software-properties",
)
print(_("Added %s to: %s") % (' '.join(added), str(entry)))
def _remove_components(self, sources):
for entry in sources:
removed = set(entry.comps) & set(self.components)
if removed:
entry.comps = list(set(entry.comps) - set(self.components))
print(_("Removed %s from: %s") % (' '.join(removed), str(entry)))
if not entry.comps:
self.sourceslist.remove(entry)
def change_components(self):
sources = [s for s in self.sourceslist.exploded_list() if not s.invalid \
and not s.disabled and s.file == SOURCESLIST]
if self.options.remove:
self._remove_components(sources)
else:
self._add_components(sources)
if not self.dry_run:
self.sourceslist.save()
def _add_pocket(self, sources):
binary_entries = {}
for s in sources:
if s.invalid or s.disabled:
continue
if s.type != self.binary_type or s.file != SOURCESLIST:
continue
suite = get_source_entry_suite(s)
if (s.uri, suite) in binary_entries:
binary_entries[(s.uri, suite)].append(s)
else:
binary_entries[(s.uri, suite)] = [s]
for (uri, suite), entries in binary_entries.items():
pockets = []
have_pocket = False
have_release = False
for e in entries:
p = get_source_entry_pocket(e)
if p == self.pocket:
print(_("Existing: %s") % str(e))
have_pocket = True
elif p == 'release':
have_release = True
pockets.append(p)
if have_pocket:
continue
if uri.startswith("http://security.ubuntu.com"):
continue
if have_release:
comps = []
for e in entries:
if get_source_entry_pocket(e) == 'release':
comps += e.comps
comps = list(set(comps))
else:
comps = ['main', 'restricted']
entry = self.sourceslist.add(
type=self.binary_type,
uri=uri,
dist='{}-{}'.format(suite, self.pocket),
orig_comps=comps,
file=SOURCESLIST,
comment="Added by software-properties",
parent=entries[0] if entries else None,
)
print(_("Adding: %s") % str(entry))
def _remove_pocket(self, sources):
for entry in sources:
if entry.invalid or entry.disabled or entry.file != SOURCESLIST:
continue
if get_source_entry_pocket(entry) != self.pocket:
continue
self.sourceslist.remove(entry)
print(_("Removed: %s") % str(entry))
def change_pocket(self):
exploded = self.sourceslist.exploded_list()
if self.options.remove:
self._remove_pocket(exploded)
else:
self._add_pocket(exploded)
if not self.dry_run:
self.sourceslist.save()
def _enable_source(self):
# Check each disabled deb-src line, enable if matching deb line exists
for s in self.sourceslist.exploded_list():
if s.invalid or not s.disabled or s.type != self.source_type:
continue
b_entry = None
for b in self.sourceslist:
if b.type != self.binary_type or b.disabled:
continue
if (b.uri, b.dist, b.comps) != (s.uri, s.dist, s.comps):
continue
b_entry = b
break
if not b_entry:
# no matching binary lines, leave the source line disabled
continue
disabled_comps = list(set(s.comps) - set(b_entry.comps))
enabled_comps = list(set(s.comps) & set(b_entry.comps))
if not enabled_comps:
# we can't enable any of the line
continue
if disabled_comps:
index = sources.index(s)
tmp = replace_source_entry(s, comps=disabled_comps)
self.sourceslist.list.insert(index + 1, tmp)
s.comps = enabled_comps
s.set_enabled(True)
print(_("Enabled: %s") % str(s).strip())
# Check each enabled deb line, to warn about missing deb-src lines, or add one if -ss
for b in self.sourceslist:
if b.invalid or b.disabled or b.type != self.binary_type:
continue
s = replace_source_entry(b, type=self.source_type)
s_entry = get_source_entry_from_list(self.sourceslist, s)
scomps = set(s_entry.comps if s_entry else [])
missing_comps = list(set(b.comps) - scomps)
if not missing_comps:
continue
s.comps = missing_comps
if self.options.enable_source > 1:
# with multiple -s, add new deb-src entries if needed for all deb entries
self.sourceslist.add(
type=s.type,
uri=s.uri,
dist=s.dist,
orig_comps=s.comps,
file=s.file,
comment="Added by software-properties",
)
print(_("Added: %s") % str(s).strip())
else:
# if only one -s used, don't add missing deb-src, just notify
print(_("Warning, missing deb-src for: %s") % str(s).strip())
def _disable_source(self):
for s in self.sourceslist:
if s.invalid or s.disabled or s.type != self.source_type:
continue
s.set_enabled(False)
print(_("Disabled: %s") % str(s).strip())
def change_source(self):
if self.options.remove:
self._disable_source()
else:
self._enable_source()
if not self.dry_run:
self.sourceslist.save()
def global_change(self):
if self.components:
if self.options.remove:
print(_("Removing component(s) '%s' from all repositories.") % ', '.join(self.components))
else:
print(_("Adding component(s) '%s' to all repositories.") % ', '.join(self.components))
if self.pocket:
if self.options.remove:
print(_("Removing pocket %s for all repositories.") % self.pocket)
else:
print(_("Adding pocket %s for all repositories.") % self.pocket)
if self.enable_source:
if self.options.remove:
print(_("Disabling %s for all repositories.") % self.source_type)
else:
print(_("Enabling %s for all repositories.") % self.source_type)
self.prompt_user()
if self.components:
self.change_components()
if self.pocket:
self.change_pocket()
if self.enable_source:
self.change_source()
def show_list(self):
merged = {}
for s in self.sourceslist:
if s.invalid or s.disabled:
continue
if not self.enable_source and s.type == self.source_type:
continue
k = (s.type, s.uri, s.dist)
if k in merged:
merged[k].comps += list(set(s.comps) - set(merged[k].comps))
else:
merged[k] = s
for s in merged.values():
print(s, '\n')
def keyalgos(self, keys, filter_caps="sS"):
"""Returns a list of key algorithms used.
This is formatted as either the curve name or the algorithm name followed
by key size; for example, ed25519 or rsa2048. The list is deduplicated.
It is possible to filter by capabilities, keys matching any of the
capabilities specified in 'filter_caps' are returned. By default, we
filter for signing keys only.
"""
cmd = 'gpg --show-keys -q --no-options --no-keyring --batch --with-colons'
# yes, --with-fingerprint twice, to print subkey fingerprints
cmd += ' --with-fingerprint' * 2
try:
with tempfile.TemporaryDirectory() as homedir:
cmd += f' --homedir {homedir}'
if not isinstance(keys, bytes):
keys = keys.encode()
stdout = subprocess.run(cmd.split(), check=True, input=keys,
stdout=subprocess.PIPE).stdout.decode()
except subprocess.CalledProcessError as e:
print(_("Warning: gpg error while processing keys:\n%s") % e)
return []
out = set()
# Non-curve algorithms
algos = {"1": "rsa", "2": "rsa", "3": "rsa", "16": "ElGamal", "17": "dsa"}
for line in stdout.splitlines():
fields = line.split(":")
if fields[0] not in ("pub", "sub"):
continue
try:
length, algo = fields[2], fields[3]
caps = fields[11] if len(fields) > 11 else ""
curve = fields[16] if len(fields) > 16 else None
except KeyError:
print(_("Warning: invalid gpg output:\n%s") % stdout)
continue
if caps and filter_caps and not any(c in caps for c in filter_caps):
continue
if curve:
out.add(curve)
elif algo in algos:
out.add(algos[algo] + str(length))
else:
out.add("unknown-algorithm:" + algo)
return sorted(out)
def refresh_signing_keys(self):
ppa_matcher = re.compile("https?://ppa.launchpad(content)?.net/(.*)/(.*)/ubuntu.*")
refresh = []
# Handlers to refresh key files for or remove key files from
to_refresh = []
to_remove = []
known_parts = set()
unknown_repos = []
# Cache the launchpad object so we don't send multiple login requests
cached_lp = None
for s in self.sourceslist:
for uri in s.uris:
match = ppa_matcher.match(s.uri)
if match:
break
if not match:
if hasattr(s, "section") and s.section.get("Signed-By"):
signed_by = s.section["Signed-By"].strip()
elif not hasattr(s, "section") and re.search("\\[.*signed-by.*\\]", s.line):
signed_by = re.search("signed-by=(\\S*)", s.line)[1].strip()
else:
signed_by = ""
if not signed_by or signed_by.startswith("/usr/share/keyrings/ubuntu-"):
continue
algos = None
if "-----BEGIN PGP PUBLIC KEY BLOCK-----" in signed_by:
keys = "\n".join(line if line.strip() != "." else "" for line in signed_by.split("\n")).strip()
algos = self.keyalgos(keys)
elif os.path.exists(signed_by):
with open(signed_by, "rb") as keyfile:
algos = self.keyalgos(keyfile.read())
if algos:
unknown_repos.append(f"{s.uri} ({', '.join(algos)})")
else:
unknown_repos.append(s.uri)
continue
shortcut = f"ppa:{match[2]}/{match[3]}"
print(_("Checking signing key for %s...") % shortcut)
try:
handler = PPAShortcutHandler(shortcut, login=self.options.login, lp=cached_lp)
cached_lp = handler.lp
known_parts.add(os.path.basename(handler.trustedparts_file))
if hasattr(s, "section") and "-----BEGIN PGP PUBLIC KEY BLOCK-----" in s.section.get("Signed-By"):
# Encoding copied from softwareproperties/shortcuthandler.py
lines = handler.trustedparts_content.splitlines()
lines = [' ' + (l if l.strip() else '.') for l in lines]
new = '\n' + '\n'.join(lines)
if new.strip() != s.section["Signed-By"].strip():
refresh.append(shortcut)
s.section["Signed-By"] = new
# A legacy trusted.gpg.d key exists, remove it
if os.path.exists(handler.trustedparts_file):
to_remove.append(handler)
elif os.path.exists(handler.trustedparts_file):
# Disable deb822 processing, we are storing the key in trusted.gpg.d
handler.deb822 = False
with open(handler.trustedparts_file, "rb") as file:
if handler.fingerprints(handler.trustedparts_content) != handler.fingerprints(file.read()):
refresh.append(shortcut)
to_refresh.append(handler)
else:
unknown_repos.append(s.uri)
except Exception as e:
print(_("Error: Could not check signing key: %s") % e)
print()
if unknown_repos:
print(_("The following repositories with Signed-By cannot be refreshed automatically:"))
for descriptor in unknown_repos:
print(" -", descriptor)
print()
interesting_part = lambda f: (f.endswith(".asc") or f.endswith(".gpg")) and not f.startswith("ubuntu-pro") and not f.startswith("ubuntu-keyring")
unknown_parts = set(f for f in os.listdir("/etc/apt/trusted.gpg.d") if interesting_part(f)) - known_parts
if unknown_parts:
print(_("The following trusted.gpg.d keys cannot be refreshed automatically:"))
for file in unknown_parts:
algos = None
with open("/etc/apt/trusted.gpg.d/" + file, "rb") as keyfile:
algos = self.keyalgos(keyfile.read())
if algos:
print(" -", file, f"({', '.join(algos)})")
else:
print(" -", file)
print()
if not refresh:
print(_("All Launchpad PPA signing keys are up-to-date"))
else:
print(ngettext("Refreshing signing key for %d Launchpad PPA:",
"Refreshing signing keys for %d Launchpad PPAs:", len(refresh)) % len(refresh))
for r in refresh:
print(" -", r)
if to_remove:
print(_("The following legacy signing keys will be disabled:"))
for handler in to_remove:
print(" -", handler.trustedparts_file)
if not self.dry_run and (refresh or to_remove):
self.prompt_user()
self.sourceslist.backup(".save")
for handler in to_remove:
print("Removing", handler.trustedparts_file)
os.rename(handler.trustedparts_file, handler.trustedparts_file + ".save")
for handler in to_refresh:
print("Replacing", handler.trustedparts_file)
os.rename(handler.trustedparts_file, handler.trustedparts_file + ".save")
handler.add_key()
self.sourceslist.save()
def main(self, args=sys.argv[1:]):
self.parse_args(args)
if not any((self.dry_run, self.options.list, os.geteuid() == 0)):
print(_("Error: must run as root"))
return False
line = ' '.join(self.options.line)
if line == '-':
line = sys.stdin.readline().strip()
# if 'line' is only (valid) components, handle as if only -c was used with no line
if self.is_components(line):
self.options.component += line.split()
line = ''
if self.options.ppa:
source = self.options.ppa
if not ':' in source:
source = 'ppa:' + source
handler = PPAShortcutHandler
elif self.options.cloud:
source = self.options.cloud
if not ':' in source:
source = 'uca:' + source
handler = CloudArchiveShortcutHandler
elif self.options.uri:
source = self.options.uri
handler = URIShortcutHandler
elif self.options.sourceslist:
source = ' '.join(self.options.sourceslist)
handler = SourcesListShortcutHandler
elif line:
source = line
handler = shortcut_handler
elif self.options.list:
self.show_list()
return True
elif self.options.refresh_keys:
self.refresh_signing_keys()
return True
elif any((self.enable_source, self.components, self.pocket)):
self.global_change()
self.apt_update()
return True
else:
print(_("Error: no actions requested."))
self.parser.print_help()
return False
try:
shortcut_params = {
'login': self.options.login,
'enable_source': self.enable_source,
'dry_run': self.dry_run,
'components': self.components,
'pocket': self.pocket,
}
shortcut = handler(source, **shortcut_params)
except ShortcutException as e:
print(e)
return False
self.prompt_user_shortcut(shortcut)
if self.options.remove:
shortcut.remove()
else:
shortcut.add()
self.apt_update()
return True
if __name__ == '__main__':
addaptrepo = AddAptRepository()
sys.exit(0 if addaptrepo.main() else 1)