|
Server : Apache/2.2.17 (Unix) mod_ssl/2.2.17 OpenSSL/0.9.8e-fips-rhel5 DAV/2 PHP/5.2.17 System : Linux localhost 2.6.18-419.el5 #1 SMP Fri Feb 24 22:47:42 UTC 2017 x86_64 User : nobody ( 99) PHP Version : 5.2.17 Disable Function : NONE Directory : /proc/22697/root/usr/lib64/python2.4/site-packages/sabayon/ |
Upload File : |
#
# Copyright (C) 2005 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
import sys
import string
import pwd
import os
import libxml2
import config
import util
import cache
import random
import ldap
import socket
defaultConf="""<profiles>
<default profile=""/>
</profiles>"""
# make sure to initialize the cache first
# this will make sure we can handle disconnection
# and initialize libxml2 environment
cache.initialize()
def dprint (fmt, *args):
util.debug_print (util.DEBUG_USERDB, fmt % args)
def get_setting (node, setting, default = None, convert_to = str):
a = node.hasProp(setting)
if a:
try:
return convert_to (a.content)
except:
raise UserDatabaseException(_("invalid type for setting %s in %s") % (setting, node.nodePath()))
return default
def expand_string (string, attrs):
res = ""
i = 0
while i < len(string):
c = string[i]
i = i + 1
if c == "%":
if i < len(string):
c = string[i]
if c != "%":
if c in attrs:
c = attrs[c]
i = i + 1
res = res + c
return res
class UserDatabaseException (Exception):
pass
class UserDatabase:
"""An encapsulation of the database which maintains an
association between users and profiles.
This database is stored by default in
$(sysconfdir)/desktop-profiles/users.xml and contains a
list of users and the profile associated with each of
those users.
A profile can be reference by either its name (in which case
the profile is stored at /etc/desktop-profiles/$(name).zip),
an absolute path or a http/file URL.
"""
def __init__ (self, db_file = None):
"""Create a UserDatabase object.
@db_file: an (optional) path which specifes the location
of the database file. If not specified, the default
location of /etc/desktop-profiles/users.xml is used.
"""
if db_file is None:
file = os.path.join (config.PROFILESDIR, "users.xml")
elif db_file[0] != '/':
file = os.path.join (config.PROFILESDIR, db_file)
else:
file = db_file
self.file = file;
self.modified = 0
dprint("New UserDatabase(%s) object\n" % self.file)
try:
self.doc = libxml2.readFile(file, None, libxml2.XML_PARSE_NOBLANKS)
# Process XInclude statements
self.doc.xincludeProcess()
except:
# TODO add fallback to last good database
dprint("failed to parse %s falling back to default conf\n" %
self.file)
self.doc = None
if self.doc == None:
self.doc = libxml2.readMemory(defaultConf, len(defaultConf),
None, None,
libxml2.XML_PARSE_NOBLANKS);
def __del__ (self):
if self.doc != None:
self.doc.freeDoc()
def __profile_name_to_location (self, profile, node):
if not profile:
return None
uri = self.__ldap_query ("locationmap", {"p":profile, "h":socket.getfqdn()})
if uri:
return uri
# do the necessary URI escaping of the profile name if needed
orig_profile = profile
try:
tmp = parseURI(profile)
except:
profile = libxml2.URIEscapeStr(profile, "/:")
# if there is a base on the node, then use
if node != None:
try:
base = node.getBase(None)
if base != None and base != "" and \
base != os.path.join (config.PROFILESDIR, "users.xml"):
# URI composition from the base
ret = libxml2.buildURI(profile, base)
if ret[0] == '/':
ret = libxml2.URIUnescapeString(ret, len(ret), None)
dprint("Converted profile name '%s' to location '%s'\n",
orig_profile, ret)
return ret
except:
pass
try:
uri = libxml2.parseURI(profile);
if uri.scheme() is None:
# it is a file path
if profile[0] != '/':
profile = os.path.join (config.PROFILESDIR, profile)
if profile[-4:] != ".zip":
profile = profile + ".zip"
else:
# TODO need to make a local copy or use the local copy
profile = profile
except:
# we really expect an URI there
profile = None
if profile[0] == '/':
profile = libxml2.URIUnescapeString(profile, len(profile), None)
dprint("Converted profile name '%s' to location '%s'\n",
orig_profile, profile)
return profile
def __open_ldap (self):
nodes = self.doc.xpathEval ("/profiles/ldap")
if len (nodes) == 0:
return None
ldap_node = nodes[0]
server = get_setting (ldap_node, "server", "localhost")
port = get_setting (ldap_node, "port", ldap.PORT, int)
l = ldap.open (server, port)
l.protocol_version = get_setting (ldap_node, "version", ldap.VERSION3, int)
l.timeout = get_setting (ldap_node, "timeout", 10, int)
bind_dn = get_setting (ldap_node, "bind_dn", "")
bind_pw = get_setting (ldap_node, "bind_pw", "")
if bind_dn != "":
l.simple_bind (bind_dn, bind_pw)
return l
def __ldap_query (self, map, replace):
nodes = self.doc.xpathEval ("/profiles/ldap/" + map)
if len (nodes) == 0:
return None
map_node = nodes[0]
l = self.__open_ldap ()
if not l:
return None
search_base = get_setting (map_node, "search_base")
query_filter = get_setting (map_node, "query_filter")
result_attribute = get_setting (map_node, "result_attribute")
scope = get_setting (map_node, "scope", "sub")
multiple_result = get_setting (map_node, "multiple_result", "first")
if search_base == None:
raise UserDatabaseException(_("No search based specified for %s"%map))
if query_filter == None:
raise UserDatabaseException(_("No query filter specified for %s"%map))
if result_attribute == None:
raise UserDatabaseException(_("No result attribute specified for %s"%map))
if scope == "sub":
scope = ldap.SCOPE_SUBTREE
elif scope == "base":
scope = ldap.SCOPE_BASE
elif scope == "one":
scope = ldap.SCOPE_ONELEVEL
else:
raise UserDatabaseException(_("Scope must be one of sub, base and one"))
query_filter = expand_string (query_filter, replace)
search_base = expand_string (search_base, replace)
results = l.search_s (search_base, scope, query_filter, [result_attribute])
if len (results) == 0:
return None
(dn, attrs) = results[0]
if not result_attribute in attrs:
return None
vals = attrs[result_attribute]
if multiple_result == "first":
val = vals[0]
elif multiple_result == "random":
val = vals[random.randint(0, len(vals)-1)]
else:
raise UserDatabaseException(_("multiple_result must be one of first and random"))
l.unbind ()
return val
def get_default_profile (self, profile_location = True):
"""Look up the default profile.
@profile_location: whether the profile location should
be returned
Return value: the location of the default profile, which
should be in a suitable form for constructing a ProfileStorage
object, or the default profile name if @profile_location is
False.
"""
default = None
try:
default = self.doc.xpathEval("/profiles/default")[0]
profile = default.prop("profile")
except:
profile = None
if not profile_location:
return profile
return self.__profile_name_to_location (profile, default)
def get_profile (self, username, profile_location = True, ignore_default = False):
"""Look up the profile for a given username.
@username: the user whose profile location should be
returned.
@profile_location: whether the profile location should
be returned
@ignore_default: don't use the default profile if
no profile is explicitly set.
Return value: the location of the profile, which
should be in a suitable form for constructing a
ProfileStorage object, or the profile name if
@profile_location is False.
"""
user = None
profile = self.__ldap_query ("profilemap", {"u":username, "h":socket.getfqdn()})
if not profile:
try:
query = "/profiles/user[@name='%s']" % username
user = self.doc.xpathEval(query)[0]
profile = user.prop("profile")
except:
profile = None
if not profile and not ignore_default:
try:
query = "/profiles/default[1][@profile]"
user = self.doc.xpathEval(query)[0]
profile = user.prop("profile")
except:
profile = None
if not profile_location:
return profile
# TODO Check the resulting file path exists
return self.__profile_name_to_location (profile, user)
def __save_as(self, filename = None):
"""Save the current version to the given filename"""
if filename == None:
filename = self.file
dprint("Saving UserDatabase to %s\n", filename)
try:
os.rename(filename, filename + ".bak")
backup = 1
except:
backup = 0
pass
try:
f = open(filename, 'w')
except:
if backup == 1:
try:
os.rename(filename + ".bak", filename)
dprint("Restore from %s.bak\n", filename)
except:
dprint("Failed to restore from %s.bak\n", filename)
raise UserDatabaseException(
_("Could not open %s for writing") % filename)
try:
f.write(self.doc.serialize("UTF-8", format=1))
f.close()
except:
if backup == 1:
try:
os.rename(filename + ".bak", filename)
dprint("Restore from %s.bak\n", filename)
except:
dprint("Failed to restore from %s.bak\n", filename)
raise UserDatabaseException(
_("Failed to save UserDatabase to %s") % filename)
self.modified = 0
def set_default_profile (self, profile):
"""Set the default profile to be used for all users.
@profile: the location of the profile.
"""
if profile is None:
profile = ""
self.modified = 0
try:
default = self.doc.xpathEval("/profiles/default")[0]
oldprofile = default.prop("profile")
if oldprofile != profile:
default.setProp("profile", profile)
self.modified = 1
except:
try:
profiles = self.doc.xpathEval("/profiles")[0]
except:
raise UserDatabaseException(
_("File %s is not a profile configuration") %
(self.file))
try:
default = profiles.newChild(None, "default", None)
default.setProp("profile", profile)
except:
raise UserDatabaseException(
_("Failed to add default profile %s to configuration") %
(profile))
self.modified = 1
if self.modified == 1:
self.__save_as()
def set_profile (self, username, profile):
"""Set the profile for a given username.
@username: the user whose profile location should be
set.
@profile: the location of the profile.
"""
if profile is None:
profile = ""
self.modified = 0
try:
query = "/profiles/user[@name='%s']" % username
user = self.doc.xpathEval(query)[0]
oldprofile = user.prop("profile")
if oldprofile != profile:
user.setProp("profile", profile)
self.modified = 1
except:
try:
profiles = self.doc.xpathEval("/profiles")[0]
except:
raise UserDatabaseException(
_("File %s is not a profile configuration") %
(self.file))
try:
user = profiles.newChild(None, "user", None)
user.setProp("name", username)
user.setProp("profile", profile)
except:
raise UserDatabaseException(
_("Failed to add user %s to profile configuration") %
(username))
self.modified = 1
if self.modified == 1:
self.__save_as()
def get_profiles (self):
"""Return the list of currently available profiles.
This is basically just list of zip files in
/etc/desktop-profiles, each without the .zip extension.
"""
list = []
try:
for file in os.listdir(config.PROFILESDIR):
if file[-4:] != ".zip":
continue
list.append(file[0:-4])
except:
dprint("Failed to read directory(%s)\n" % (config.PROFILESDIR))
# TODO: also list remote profiles as found in self.doc
return list
def get_users (self):
"""Return the list of users on the system. These should
be real users - i.e. should not include system users
like nobody, gdm, nfsnobody etc.
"""
list = []
try:
users = pwd.getpwall()
except:
raise UserDatabaseException(_("Failed to get the user list"))
for user in pwd.getpwall():
try:
# remove non-users
if user[2] < 500:
continue
if user[0] in list:
continue
if user[6] == "" or string.find(user[6], "nologin") != -1:
continue
list.append(user[0])
except:
pass
return list
user_database = None
def get_database ():
"""Return a UserDatabase singleton"""
global user_database
if user_database is None:
user_database = UserDatabase ()
return user_database
#
# Unit tests
#
def run_unit_tests ():
testfile = "/tmp/test_users.xml"
try:
os.unlink(testfile)
except:
pass
db = UserDatabase(testfile)
db.set_default_profile("default")
res = db.get_profile("localuser", False)
assert not res is None
assert res == "default"
db.set_profile("localuser", "groupA")
res = db.get_profile("localuser")
assert not res is None
assert res[-28:] == "/desktop-profiles/groupA.zip"
db.set_profile("localuser", "groupB")
res = db.get_profile("localuser")
assert not res is None
assert res[-28:] == "/desktop-profiles/groupB.zip"
if __name__ == "__main__":
util.init_gettext ()
run_unit_tests()