403Webshell
Server IP : 192.64.112.168  /  Your IP : 52.15.113.71
Web Server : Apache
System : Linux nc-ph-2300-85.bluforrest.com 4.18.0-513.9.1.el8_9.x86_64 #1 SMP Sat Dec 2 05:23:44 EST 2023 x86_64
User : expressoneac ( 1128)
PHP Version : 8.0.30
Disable Function : exec,passthru,shell_exec,system
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3.6/site-packages/dnf-plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python3.6/site-packages/dnf-plugins/kpatch.py
# kpatch patch module packages management plugin for dnf
#
# Copyright (C) 2020 Julien Thierry <jthierry@redhat.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA,
# 02110-1301, USA.


from __future__ import absolute_import
from __future__ import unicode_literals

import configparser
import os.path
import re

from dnfpluginscore import _, logger

import dnf.callback
import dnf.cli
import dnf.exceptions
import dnf.transaction

KPATCH_PLUGIN_NAME = "kpatch"
KPATCH_UPDATE_OPT = "autoupdate"

KERNEL_PKG_NAME = "kernel-core"

# Dnf offers to lookup and read the plugin config file but doesn't provide
# a way to update that file nor to get the name...
def _get_plugin_cfg_file(base_conf):
    files = ['%s/%s.conf' % (path, KPATCH_PLUGIN_NAME) for path in base_conf.pluginconfpath]
    for file in files:
        if os.path.isfile(file):
            return file
    return None


def _kpp_name_from_kernel_pkg(kernel_pkg):
    kernel_release = re.match(r"(.*)\.el.*", kernel_pkg.release).group(1)
    kpp_kernel_release = kernel_release.replace(".", "_")
    kpp_kernel_version = kernel_pkg.version.replace(".", "_")
    return "kpatch-patch-{}-{}".format(kpp_kernel_version, kpp_kernel_release)


def _install_kpp_pkg(dnf_base, kernel_pkg):
    kpp_pkg_name = _kpp_name_from_kernel_pkg(kernel_pkg)
    kpp_pkgs_query = dnf_base.sack.query().filter(name=kpp_pkg_name,
                                                  arch=kernel_pkg.arch)
    kpp_sltr = dnf.selector.Selector(dnf_base.sack)
    kpp_sltr.set(pkg=kpp_pkgs_query.latest())
    dnf_base.goal.install(select=kpp_sltr, optional=not dnf_base.conf.strict)


class KpatchCmd(dnf.cli.Command):

    aliases = ('kpatch',)
    summary = _('Toggles automatic installation of kpatch-patch packages')


    def __init__(self, cli):
        super(KpatchCmd, self).__init__(cli)
        self.cfg_file = _get_plugin_cfg_file(self.base.conf)


    @staticmethod
    def set_argparser(parser):
        parser.add_argument('action', metavar="auto|manual|install|status")


    def configure(self):
        demands = self.cli.demands

        demands.root_user = True
        if self.opts.action in ["auto", "install", "status"]:
            demands.resolving = True
            demands.sack_activation = True
            demands.available_repos = True
        else:
            demands.resolving = False
            demands.sack_activation = False
            demands.available_repos = False


    def _list_missing_kpp_pkgs(self):
        kpps = []

        installed_kernels = self.base.sack.query().installed().filter(name=KERNEL_PKG_NAME)

        for kernel_pkg in installed_kernels:
            kpp_pkg_name = _kpp_name_from_kernel_pkg(kernel_pkg)
            installed = self.base.sack.query().installed().filter(name=kpp_pkg_name).run()
            if installed:
                sub_q = self.base.sack.query().filter(name=kpp_pkg_name, release=installed[0].release, version=installed[0].version)
                kpp_pkgs_query = self.base.sack.query().filter(name=kpp_pkg_name, arch=kernel_pkg.arch).latest().difference(sub_q)
            else:
                kpp_pkgs_query = self.base.sack.query().filter(name=kpp_pkg_name, arch=kernel_pkg.arch).latest()

            for pkg in kpp_pkgs_query:
                kpps.append(str(pkg))

        return kpps


    def _install_missing_kpp_pkgs(self):
        installed_kernels = self.base.sack.query().installed().filter(name=KERNEL_PKG_NAME)

        for kernel_pkg in installed_kernels:
            _install_kpp_pkg(self.base, kernel_pkg)


    def _read_conf(self):
        if self.cfg_file is None:
            logger.warning("Couldn't find configuration file")
            return None
        try:
            parser = configparser.ConfigParser()
            parser.read(self.cfg_file)
            return parser
        except Exception as e:
            raise dnf.exceptions.Error(_("Parsing file failed: {}").format(str(e)))


    def _update_plugin_cfg(self, value):
        if self.cfg_file is None:
            logger.warning("Couldn't find configuration file")
            return None

        conf = self._read_conf()
        if conf is None:
            return

        if not conf.has_section('main'):
            conf.add_section('main')
        conf.set('main', KPATCH_UPDATE_OPT, str(value))

        try:
            with open(self.cfg_file, 'w') as cfg_stream:
                conf.write(cfg_stream)
        except Exception as e:
            raise dnf.exceptions.Error(_("Failed to update conf file: {}").format(str(e)))


    def run(self):
        action = self.opts.action

        if action == "auto":
            self._install_missing_kpp_pkgs()
            self._update_plugin_cfg(True)
        elif action == "manual":
            self._update_plugin_cfg(False)
        elif action == "status":
            conf = self._read_conf()
            kp_status = "manual"
            if (conf is not None and conf.has_section('main') and
                conf.has_option('main', KPATCH_UPDATE_OPT) and
                conf.getboolean('main', KPATCH_UPDATE_OPT)):
                kp_status = "auto"
            logger.info(_("Kpatch update setting: {}").format(kp_status))

            kpps = self._list_missing_kpp_pkgs()
            if kpps:
                logger.info(_("Available patches: {}").format(", ".join(kpps)))

        elif action == "install":
            self._install_missing_kpp_pkgs()
        else:
            raise dnf.exceptions.Error(_("Invalid argument: {}").format(action))



class KpatchPlugin(dnf.Plugin):

    name = KPATCH_PLUGIN_NAME


    def __init__(self, base, cli):
        super(KpatchPlugin, self).__init__(base, cli)
        self._commiting = False
        self._autoupdate = False
        if cli is not None:
            cli.register_command(KpatchCmd)


    def config(self):
        parser = self.read_config(self.base.conf)
        try:
            self._autoupdate = (parser.has_section('main')
                                and parser.has_option('main', KPATCH_UPDATE_OPT)
                                and parser.getboolean('main', KPATCH_UPDATE_OPT))
        except Exception as e:
            logger.warning(_("Parsing file failed: {}").format(str(e)))


    def _commit_changes(self):
        self._commiting = True
        # Get dnf's dependency manager to resolve missing deps for added pkgs
        self.base.resolve(self.cli.demands.allow_erasing)
        self._commiting = False


    def resolved(self):
        # Calling self.base.resolve() will run this callback again
        if not self._autoupdate or self._commiting:
            return

        need_kpp_for = []
        explicit_kpp_install = []
        for tr_item in self.base.transaction:
            # It might not be safe to check tr_item.pkg.name as there might be
            # some dnf internal transaction items not linked to any package.
            # Check first whether the action is a package related action
            if tr_item.action in dnf.transaction.FORWARD_ACTIONS:
                if tr_item.pkg.name == KERNEL_PKG_NAME:
                    need_kpp_for.append(tr_item.pkg)
                elif tr_item.pkg.name.startswith("kpatch-patch-"):
                    explicit_kpp_install.append(tr_item.pkg.name)

        # If the user already requested a kpatch-patch package, don't override it
        # nor conflict with it
        need_kpp_for = [pkg for pkg in need_kpp_for
                        if _kpp_name_from_kernel_pkg(pkg) not in explicit_kpp_install]
        for kernel_pkg in need_kpp_for:
            _install_kpp_pkg(self.base, kernel_pkg)

        if need_kpp_for:
            self._commit_changes()

Youez - 2016 - github.com/yon3zu
LinuXploit