|
Server : Apache/2.2.17 (Unix) mod_ssl/2.2.17 OpenSSL/0.9.8e-fips-rhel5 DAV/2 PHP/5.2.17 System : Linux localhost 2.6.18-419.el5 #1 SMP Fri Feb 24 22:47:42 UTC 2017 x86_64 User : nobody ( 99) PHP Version : 5.2.17 Disable Function : NONE Directory : /proc/21585/root/usr/share/system-config-services/ |
Upload File : |
"""The servicemethods module handles all the backend processing for the system-config-services application."""
# serviceactions.py
# Copyright (C) 2002 - 2006 Red Hat, Inc.
# Authors:
# Tim Powers <timp@redhat.com>
# Dan Walsh <dwalsh@redhat.com>
# Brent Fox <bfox@redhat.com>
# Nils Philippsen <nphilipp@redhat.com>
# Florian Festi <ffesti@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import re, os
from rhpl.translate import _, N_, cat
import copy
import nonblockingreader
def getstatusoutput(cmd, callback):
"""Return (status, output) of executing cmd in a shell."""
pipe = os.popen("{ %s ; } 2>&1" % (cmd), 'r')
output = nonblockingreader.Reader ().run ([pipe], callback)
text = output[pipe]
status = pipe.close ()
if status is None: status = 0
if text[-1:] == '\n':
text = text[:-1]
return status, text
# ============================================================================
UNKNOWN = 0
RUNNING = 1
STOPPED = 2
ERROR = 4
class Service:
"""Represents a service with start script in /etc/init.d
"""
def __init__(self, name, runlevels, services):
self.name = name
self._runlevels = runlevels
self._runlevels_old = runlevels[:]
self.services = services
self.uicallback = services.uicallback
self.startprio = None
self.stopprio = None
self.defaultrunlevels = []
self.hide = False
self.description = ""
self.processnames = []
self.configfiles = []
self.pidfiles = []
self.probe = False
self._read_script_file()
self._dirty = False
def __getattr__(self, name):
# sloppy evaluate the status as it takes some time
if name in ("status", "status_message"):
self.status, self.status_message = self.get_status()
if name == "status":
return self.status
else:
return self.status_message
else:
raise AttributeError, name
def is_xinetd_service(self):
return False
# ----
def get_runlevels(self):
return self._runlevels
# ----
def is_default(self):
"""return if service is configured as suggested with defaultsrunlevels
This is either service is off in all runlevels or on on exatctly
the defaultrunlevels
"""
if not self.defaultrunlevels:
return False
on = self._runlevels[self.defaultrunlevels[0]]
for runlevel in xrange(len(self._runlevels)):
if runlevel not in self.defaultrunlevels:
if self._runlevels[runlevel]: # non default switched on
return False
elif self._runlevels[runlevel] != on:
# default runlevel != the others
return False
return True
# ----
def is_default_on(self):
"""
Return if service is enabled in the default runlevels
Assumes .is_default() == True!!!"""
return self._runlevels[self.defaultrunlevels[0]]
# ----
def get_status(self):
"""Execute /sbin/service name status
return (status of the service, the output of the command)
"""
status = UNKNOWN
try:
message = getstatusoutput("LC_ALL=C /sbin/service " + self.name + " status", self.uicallback)[1]
except:
return (UNKNOWN, "")
if message.find("running")!=-1:
status = RUNNING
if message.find("stopped")!=-1:
status = STOPPED
return (status, message)
# ----
def get_script_file(self):
return "/etc/init.d/" + self.name
# ====
def set_dirty(self):
"""Tell that a config file got changed"""
if self.state != STOPPED:
self._dirty = True
# ----
def is_dirty(self):
"""Are there still changes not realized by the service yet"""
return self._dirty
# =====
def set_in_runlevels(self, on, editing_runlevels=0):
"""changed if service is started in the given runlevels
runlevels may be an int or sequence of ints
changes are not written to disk. Use .save_changes()"""
if isinstance(editing_runlevels, int):
editing_runlevels = [editing_runlevels]
for runlevel in editing_runlevels:
self._runlevels[runlevel] = on
self.services._service_changed(self)
# ----
def _set_in_runlevels(self, on, editing_runlevels):
"""calls chkconfig --level editing_runlevel on if on, off if not on"""
chkconfig_action = ("on", "off")[not on]
try:
getstatusoutput("LC_ALL=C /sbin/chkconfig --level %s %s %s" % (editing_runlevels, self.name, chkconfig_action), self.uicallback)
except IOError:
pass
# ----
def set(self, on):
"""Change is service is started in the default runlevels"""
for runlevel in self.defaultrunlevels:
self._runlevels[runlevel] = on
self.services._service_changed(self)
# ----
def is_changed(self):
"""Are there any changes to runlevel behaviour that
are not yet written to disk"""
for new, old in zip(self._runlevels, self._runlevels_old):
if new != old:
return True
return False
# ----
def save_changes(self):
"""when this method is used it saves the change to disk"""
changed = False
runlevel = 0
for new, old in zip(self._runlevels, self._runlevels_old):
if new ^ old:
self._set_in_runlevels(new, runlevel)
changed = True
runlevel += 1
self._runlevels_old = self._runlevels[:]
return changed
# ----
def action(self, action):
"""Execute /sbin/service servicename action.
return (errorcode, message)"""
status, message = getstatusoutput("/sbin/service %s %s" %
(self.name, action),
self.uicallback)
if status != 0:
if action in ('start', 'restart', 'reload', 'condrestart'):
self.status = ERROR
self.message = message
return (1, _("%s failed. The error was: %s") %
(self.name, message))
else:
self.status, self.status_message = self.get_status()
self._dirty = False
return (0,"%s %s" % (self.name, action) +
_(" successful"))
# ----
def _read_script_file(self):
"""Gets the description for the given initscript or xinet.d script"""
try:
fd = open(self.get_script_file())
except IOError:
return
tag_names = ["chkconfig", "description", "processname", "config",
"pidfile", "probe", "hide",
"default"] # xinetd
# "description[ln]" still missing
data = []
tag = None
for line in fd:
if line[0]!="#":
break
line = line[1:].strip()
if not tag:
match = re.match(r"^\s*(\w+):", line)
if match and match.group(1) in tag_names:
tag = match.group(1)
line = line[match.end():]
if tag:
if line.endswith("\\"):
data.append(line[:-1].strip())
else:
data.append(line.strip())
self._process_initscript_tag(tag, " ".join(data))
tag = None
data = []
def _process_initscript_tag(self, name, value):
"""used by _read_script_file only"""
if name == "chkconfig":
match = re.match(r"\s*(\d+|-)\s*(\d+)\s*(\d+)", value)
if match:
if match.group(1) != "-":
for char in match.group(1):
self.defaultrunlevels.append(int(char))
self.startprio = int(match.group(2))
self.stopprio = int(match.group(3))
elif name == "default":
if value.find("on") != -1:
self.defaultrunlevels.append(0)
elif name == "description":
self.description = value
elif name == "processname":
self.processnames.append(value)
elif name == "config":
self.configfiles.append(value)
elif name == "pidfile":
self.pidfiles.append(value)
elif name == "probe":
self.probe = value.find("true") != -1
elif name == "hide":
self.hide = value.find("true") != -1
def revert (self):
self._runlevels = copy.copy (self._runlevels_old)
# ============================================================================
class XinetdService(Service):
def is_xinetd_service(self):
return True
def get_status(self):
xinetd = self.services.get('xinetd', None)
if xinetd:
return xinetd.get_status()
else:
return (UNKNOWN, "")
def get_script_file(self):
return "/etc/xinetd.d/" + self.name
# ----
def _set_in_runlevels(self, on, editing_runlevels=None):
"""calls chkconfig on/off"""
chkconfig_action = ("on", "off")[not on]
try:
getstatusoutput("LC_ALL=C /sbin/chkconfig %s %s" %
(self.name, chkconfig_action), self.uicallback)
except IOError:
pass
# ----
def action(self, action):
"""starts, stops, and restarts the service. Returns the error
if the service failed in any of the actions.
"""
xinetd = self.services._services.get('xinetd', None)
if xinetd:
if xinetd.status != RUNNING:
return (1, _("xinetd must be enabled for %s to run") %
self.name)
error, output = xinetd.action('reload')
if error != 0:
return (1,_("xinetd failed to reload for ") +
self.name +
_(". The error was: ") + output)
else:
return (0,_("xinetd reloaded %s successfully") % self.name)
else:
return (1, _("xinetd must be enabled for %s to run") %
self.name)
# ============================================================================
class Services:
"""Includes methods used to find services, and information about them such
as the description, whether or not it is configured etc."""
def __init__(self, uicallback = None):
self.uicallback = uicallback
self._services = { }
self._xinetd_services = { }
self._changed = { }
# ----
def __getitem__(self, name):
return self._services.get(name, None) or self._xinetd_services[name]
# ----
def has_key(self, name):
return (self._services.has_key(name) or
self._xinetd_services.has_key(name))
# ----
def __iter__(self):
"""Iter over all non xinetd services"""
keys = self._services.keys()
keys.sort()
for key in keys:
yield self._services[key]
# ----
def xinetd_services(self):
"""return iterator over all xinetd services"""
keys = self._xinetd_services.keys()
keys.sort()
for key in keys:
yield self._xinetd_services[key]
# ----
def get_runlevel(self):
"""returns the current runlevel, uses /sbin/runlevel"""
status, output = getstatusoutput("/sbin/runlevel", self.uicallback)
# This is the current runlevel
return int(output[2])
# ----
def add_service(self, servicename):
"""calls chkconfig --add servicename"""
status, output = getstatusoutput("LC_ALL=C /sbin/chkconfig --add %s" %
(servicename), self.uicallback)
if status == 0:
self.update(servicename)
return status, output
# ----
def delete_service(self, servicename):
"""calls chkconfig --del servicename"""
result = getstatusoutput("LC_ALL=C /sbin/chkconfig --del %s" %
(servicename), self.uicallback)
self.update(servicename)
return result
# ----
def _service_from_chkconfig(self, line):
"""parse one line of chkconfig --list output
return (name, runlevels as list of bools)
or None if not a valid line"""
line = line.strip()
if not line: return None
entries = line.split("\t")
entries = [entry.strip() for entry in entries]
if entries[0].endswith(":"):
name = entries[0][:-1]
else:
name = entries[0]
if name == "xinetd based services": return None
entries = entries[1:]
runlevels = [entry.split(":")[-1]=="on" for entry in entries]
return name, runlevels
# ----
def get_service_lists (self, servicename=''):
"""read in data about services in the system.
if servicename is given only get data for this service
"""
self._services = {}
self._xinetd_services = {}
chkconfig_list = getstatusoutput(
"LC_ALL=C /sbin/chkconfig --list %s 2>/dev/null" % servicename,
self.uicallback)[1]
chkconfig_list = chkconfig_list.splitlines()
for line in chkconfig_list:
service = self._service_from_chkconfig(line)
if service is None:
continue
name, runlevels = service
if len(runlevels) == 1:
service = XinetdService(name, runlevels, self)
self._xinetd_services[name] = service
else:
service = Service(name, runlevels, self)
self._services[name] = service
if servicename and self._xinetd_services:
#if only one xinetd service loaded load also xinetd
self.update("xinetd")
# ----
def update(self, servicename):
"""Read status of service from disk. If service got removed
remove from ourselfs"""
status, output = getstatusoutput("LC_ALL=C /sbin/chkconfig --list " +
servicename, self.uicallback)
if status == 0:
service = self._service_from_chkconfig(output)
if service is not None:
name, runlevels = service
if len(runlevels) == 1:
service = XinetdService(name, runlevels, self)
self._xinetd_services[name] = service
else:
service = Service(name, runlevels, self)
self._services[name] = service
return
# service not found, remove if exists in memory
self._xinetd_services.pop(servicename, None)
self._services.pop(servicename, None)
# ----
def is_changed(self):
"""Runlevel behaviour of any service got changed
but not yet written to disk"""
return bool(self._changed)
# ----
def _service_changed(self, service):
"""Called by a service that got changed"""
if service.is_changed():
self._changed[service] = 1
else:
self._changed.pop(service, None)
# ----
def save_changes(self):
"""when this method is used it saves the change to disk"""
for service in self._services.itervalues():
service.save_changes()
reload_xinetd = False
for service in self._xinetd_services.itervalues():
changed = service.save_changes()
reload_xinetd = reload_xinetd or changed
if reload_xinetd:
getstatusoutput("/sbin/service xinetd reload", self.uicallback)
self._changed = { }
def revert (self):
for service in self._services.itervalues ():
service.revert ()
for service in self._xinetd_services.itervalues ():
service.revert ()