|
Server : Apache/2.2.17 (Unix) mod_ssl/2.2.17 OpenSSL/0.9.8e-fips-rhel5 DAV/2 PHP/5.2.17 System : Linux localhost 2.6.18-419.el5 #1 SMP Fri Feb 24 22:47:42 UTC 2017 x86_64 User : nobody ( 99) PHP Version : 5.2.17 Disable Function : NONE Directory : /proc/21571/root/usr/share/doc/m2crypto-0.16/tests/ |
Upload File : |
#!/usr/bin/python
"""Unit tests for M2Crypto.SSL.
Copyright (c) 2000-2004 Ng Pheng Siong. All rights reserved."""
"""
TODO
Server tests:
- ???
Others:
- ssl_dispatcher
- SSLServer
- ForkingSSLServer
- ThreadingSSLServer
"""
import os, socket, string, sys, tempfile, thread, time, unittest
from M2Crypto import Rand, SSL, m2
srv_host = 'localhost'
srv_port = 64000
def verify_cb_new_function(ok, store):
try:
assert not ok
err = store.get_error()
assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
err == m2.X509_V_ERR_CERT_UNTRUSTED or \
err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
app_data = m2.x509_store_ctx_get_app_data(store.ctx)
assert app_data
except AssertionError, e:
# If we let exceptions propagate from here the
# caller may see strange errors. This is cleaner.
return 0
return 1
class VerifyCB:
def __call__(self, ok, store):
return verify_cb_new_function(ok, store)
class SSLClientTestCase(unittest.TestCase):
def start_server(self, args):
pid = os.fork()
if pid == 0:
os.execvp('openssl', args)
else:
time.sleep(0.5)
return pid
def stop_server(self, pid):
os.kill(pid, 1)
os.waitpid(pid, 0)
def http_get(self, s):
s.send('GET / HTTP/1.0\n\n')
resp = ''
while 1:
try:
r = s.recv(4096)
if not r:
break
except SSL.SSLError: # s_server throws an 'unexpected eof'...
break
resp = resp + r
return resp
def setUp(self):
self.srv_host = srv_host
self.srv_port = srv_port
self.srv_addr = (srv_host, srv_port)
self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)
self.args = ['s_server', '-quiet', '-www',
#'-cert', 'server.pem', Implicitly using this
'-accept', str(self.srv_port)]
def tearDown(self):
global srv_port
srv_port = srv_port - 1
def test_no_connection(self):
ctx = SSL.Context()
s = SSL.Connection(ctx)
def test_server_simple(self):
pid = self.start_server(self.args)
try:
self.assertRaises(ValueError, SSL.Context, 'tlsv5')
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_tls1_nok(self):
self.args.append('-no_tls1')
pid = self.start_server(self.args)
try:
ctx = SSL.Context('tlsv1')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'wrong version number')
s.close()
finally:
self.stop_server(pid)
def test_tls1_ok(self):
self.args.append('-tls1')
pid = self.start_server(self.args)
try:
ctx = SSL.Context('tlsv1')
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_sslv23_no_v2(self):
self.args.append('-no_tls1')
pid = self.start_server(self.args)
try:
ctx = SSL.Context('sslv23')
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
self.failUnlessEqual(s.get_version(), 'SSLv3')
s.close()
finally:
self.stop_server(pid)
def test_sslv23_no_v2_no_service(self):
self.args = self.args + ['-no_tls1', '-no_ssl3']
pid = self.start_server(self.args)
try:
ctx = SSL.Context('sslv23')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_sslv23_weak_crypto(self):
self.args = self.args + ['-no_tls1', '-no_ssl3']
pid = self.start_server(self.args)
try:
ctx = SSL.Context('sslv23', weak_crypto=1)
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
self.failUnlessEqual(s.get_version(), 'SSLv2')
s.close()
finally:
self.stop_server(pid)
def test_cipher_mismatch(self):
self.args = self.args + ['-cipher', 'EXP-RC4-MD5']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.set_cipher_list('EXP-RC2-CBC-MD5')
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
s.close()
finally:
self.stop_server(pid)
def test_no_such_cipher(self):
self.args = self.args + ['-cipher', 'EXP-RC4-MD5']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.set_cipher_list('EXP-RC2-MD5')
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'no ciphers available')
s.close()
finally:
self.stop_server(pid)
def test_no_weak_cipher(self):
self.args = self.args + ['-cipher', 'EXP']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
s.close()
finally:
self.stop_server(pid)
def test_use_weak_cipher(self):
self.args = self.args + ['-cipher', 'EXP']
pid = self.start_server(self.args)
try:
ctx = SSL.Context(weak_crypto=1)
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_cipher_ok(self):
self.args = self.args + ['-cipher', 'EXP-RC4-MD5']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.set_cipher_list('EXP-RC4-MD5')
s.connect(self.srv_addr)
data = self.http_get(s)
cipher_stack = s.get_ciphers()
assert cipher_stack[0].name() == 'EXP-RC4-MD5', cipher_stack[0].name()
self.assertRaises(IndexError, cipher_stack.__getitem__, 2)
# For some reason there are 2 entries in the stack
#assert len(cipher_stack) == 1, len(cipher_stack)
assert s.get_cipher_list() == 'EXP-RC4-MD5', s.get_cipher_list()
# Test Cipher_Stack iterator
i = 0
for cipher in cipher_stack:
i += 1
assert cipher.name() == 'EXP-RC4-MD5', '"%s"' % cipher.name()
# For some reason there are 2 entries in the stack
#assert i == 1, i
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def verify_cb_new(self, ok, store):
return verify_cb_new_function(ok, store)
def test_verify_cb_new(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
self.verify_cb_new)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cb_new_class(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
VerifyCB())
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cb_new_function(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
verify_cb_new_function)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cb_lambda(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
lambda ok, store: 1)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def verify_cb_exception(self, ok, store):
raise Exception, 'We should fail verification'
def test_verify_cb_exception(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
self.verify_cb_exception)
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_verify_cb_not_callable(self):
ctx = SSL.Context()
self.assertRaises(TypeError,
ctx.set_verify,
SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
9,
1)
def test_verify_cb_wrong_callable(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
lambda _: '')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):
try:
from M2Crypto import X509
assert not ok
assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
err == m2.X509_V_ERR_CERT_UNTRUSTED or \
err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
assert m2.ssl_ctx_get_cert_store(ctx_ptr)
assert X509.X509(x509_ptr).as_pem()
except AssertionError:
# If we let exceptions propagate from here the
# caller may see strange errors. This is cleaner.
return 0
return 1
def test_verify_cb_old(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
self.verify_cb_old)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_allow_unknown_old(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
SSL.cb.ssl_verify_callback)
ctx.set_allow_unknown_ca(1)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_allow_unknown_new(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
SSL.cb.ssl_verify_callback_allow_unknown_ca)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('ca.pem')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert_fail(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('server.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_verify_cert_mutual_auth(self):
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('ca.pem')
ctx.load_cert('x509.pem')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert_mutual_auth_servernbio(self):
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('ca.pem')
ctx.load_cert('x509.pem')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert_mutual_auth_fail(self):
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('ca.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_verify_nocert_fail(self):
self.args.extend(['-nocert'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('ca.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_HTTPSConnection(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPSConnection(srv_host, srv_port)
c.request('GET', '/')
data = c.getresponse().read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_HTTPS(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPS(srv_host, srv_port)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
err, msg, headers = c.getreply()
assert err == 200, err
f = c.getfile()
data = f.read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_urllib(self):
pid = self.start_server(self.args)
try:
from M2Crypto import m2urllib
url = m2urllib.FancyURLopener()
url.addheader('Connection', 'close')
u = url.open('https://%s:%s/' % (srv_host, srv_port))
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
# XXX Don't actually know how to use m2urllib safely!
#def test_urllib_safe_context(self):
#def test_urllib_safe_context_fail(self):
def test_urllib2(self):
pid = self.start_server(self.args)
try:
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener()
opener.addheaders = [('Connection', 'close')]
u = opener.open('https://%s:%s/' % (srv_host, srv_port))
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_urllib2_secure_context(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('ca.pem')
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx)
opener.addheaders = [('Connection', 'close')]
u = opener.open('https://%s:%s/' % (srv_host, srv_port))
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_urllib2_secure_context_fail(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('server.pem')
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx)
opener.addheaders = [('Connection', 'close')]
self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))
finally:
self.stop_server(pid)
def test_blocking0(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.setblocking(0)
self.assertRaises(Exception, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_blocking1(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.setblocking(1)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_timeout(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
# Just a really small number so we can timeout
s.settimeout(0.000000000000000000000000000001)
self.assertRaises(SSL.SSLTimeoutError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_makefile_timeout(self):
# httpslib uses makefile to read the response
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPS(srv_host, srv_port)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
c._conn.sock.settimeout(100)
err, msg, headers = c.getreply()
assert err == 200, err
f = c.getfile()
data = f.read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_makefile_timeout_fires(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPS(srv_host, srv_port)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
c._conn.sock.settimeout(0.0000000001)
self.assertRaises(socket.timeout, c.getreply)
c.close()
finally:
self.stop_server(pid)
def test_twisted_wrapper(self):
#
# LEAK ALERT!
# If this method is commented out, the leak detection code does
# not find any leaks.
#
# Test only when twisted and ZopeInterfaces are present
try:
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
except ImportError:
import warnings
warnings.warn('Skipping twisted wrapper test because twisted not found')
return
class EchoClient(LineReceiver):
def connectionMade(self):
self.sendLine('GET / HTTP/1.0\n\n')
def lineReceived(self, line):
global twisted_data
twisted_data += line
class EchoClientFactory(ClientFactory):
protocol = EchoClient
def clientConnectionFailed(self, connector, reason):
reactor.stop()
assert 0, reason
def clientConnectionLost(self, connector, reason):
reactor.stop()
pid = self.start_server(self.args)
class ContextFactory:
def getContext(self):
return SSL.Context()
try:
global twisted_data
twisted_data = ''
contextFactory = ContextFactory()
factory = EchoClientFactory()
wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)
reactor.run() # This will block until reactor.stop() is called
finally:
self.stop_server(pid)
self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)
twisted_data = ''
class CheckerTestCase(unittest.TestCase):
def test_checker(self):
from M2Crypto.SSL import Checker
from M2Crypto import X509
check = Checker.Checker(host=srv_host,
peerCertHash='9594D272A975F58F4430511D15B4B7FF3D778113')
x509 = X509.load_cert('server.pem')
assert check(x509, srv_host)
self.assertRaises(Checker.WrongHost, check, x509, 'example.com')
import doctest
doctest.testmod(Checker)
class ContextTestCase(unittest.TestCase):
def test_ctx_load_verify_locations(self):
ctx = SSL.Context()
self.assertRaises(AssertionError, ctx.load_verify_locations, None, None)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CheckerTestCase))
suite.addTest(unittest.makeSuite(ContextTestCase))
suite.addTest(unittest.makeSuite(SSLClientTestCase))
return suite
def zap_servers():
s = 's_server'
fn = tempfile.mktemp()
cmd = 'ps | egrep %s > %s' % (s, fn)
os.system(cmd)
f = open(fn)
while 1:
ps = f.readline()
if not ps:
break
chunk = string.split(ps)
pid, cmd = chunk[0], chunk[4]
if cmd == s:
os.kill(int(pid), 1)
f.close()
os.unlink(fn)
if __name__ == '__main__':
report_leaks = 0
if report_leaks:
import gc
gc.enable()
gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL)
try:
Rand.load_file('../randpool.dat', -1)
unittest.TextTestRunner().run(suite())
Rand.save_file('../randpool.dat')
finally:
zap_servers()
if report_leaks:
import alltests
alltests.dump_garbage()