#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
"""PyAMS_utils.protocol.xmlrpc module
This module provides a few set of classes and functions usable to improve XML-RPC client usage.
It provides custom transports and allows storage of response cookies
"""
import base64
import http.client
import http.cookiejar
import socket
import urllib.request
import xmlrpc.client
try:
import gzip
except ImportError:
gzip = None # python can be built without zlib/gzip support
__docformat__ = 'restructuredtext'
[docs]class XMLRPCCookieAuthTransport(xmlrpc.client.Transport):
# pylint: disable=too-many-instance-attributes
"""An XML-RPC transport handling authentication via cookies"""
_http_connection = http.client.HTTPConnection
verbose = False
def __init__(self, user_agent, credentials=(), cookies=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, headers=None):
# pylint: disable=protected-access,too-many-arguments
xmlrpc.client.Transport.__init__(self)
self.user_agent = user_agent
self.credentials = credentials
self.cookies = cookies
self.timeout = timeout
self.headers = headers
[docs] def request(self, host, handler, request_body, verbose=False):
self.verbose = verbose
# issue XML-RPC request
connection = self.send_request(host, handler, request_body, verbose)
# get response
return self.get_response(connection, host, handler)
[docs] def make_connection(self, host):
# This is the make_connection that runs under Python 2.7 and newer.
# The code is pulled straight from 2.7 xmlrpclib, except replacing
# HTTPConnection with self._http_connection
if self._connection and host == self._connection[0]:
return self._connection[1]
chost, self._extra_headers, _x509 = self.get_host_info(host)
self._connection = host, self._http_connection(chost, timeout=self.timeout)
return self._connection[1]
[docs] def send_request(self, host, handler, request_body, debug):
connection = self.make_connection(host)
headers = self._extra_headers[:]
if debug:
connection.set_debuglevel(1)
if self.accept_gzip_encoding and gzip:
connection.putrequest("POST", handler, skip_accept_encoding=True)
headers.append(("Accept-Encoding", "gzip"))
else:
connection.putrequest("POST", handler)
self.send_auth(connection)
self.send_content_type(connection)
self.send_user_agent(connection)
self.send_headers(connection, headers)
self.send_content(connection, request_body)
return connection
[docs] def send_auth(self, connection):
"""Override the send_host hook to also send authentication info"""
if (self.cookies is not None) and (len(self.cookies) > 0):
for cookie in self.cookies:
connection.putheader('Cookie', '%s=%s' % (cookie.name, cookie.value))
elif self.credentials:
creds = base64.encodebytes(("%s:%s" % self.credentials).encode()).strip().decode()
auth = 'Basic %s' % creds
connection.putheader('Authorization', auth)
[docs] @staticmethod
def send_content_type(connection):
"""Send content type"""
connection.putheader('Content-Type', 'text/xml')
[docs] def send_user_agent(self, connection):
"""Send user agent"""
connection.putheader('User-Agent', self.user_agent)
[docs] class CookieRequest(urllib.request.Request):
"""Dummy request class used for extracting cookies"""
[docs] class CookieResponseHelper:
"""Dummy response headers helper"""
def __init__(self, response):
self.response = response
[docs] class CookieResponse:
"""Dummy response class used to extract cookies"""
def __init__(self, response):
self.response = response
[docs] def info(self):
"""Get response info from cookies"""
return XMLRPCCookieAuthTransport.CookieResponseHelper(self.response)
[docs] def get_response(self, connection, host, handler):
"""Get server response"""
response = connection.getresponse()
# extract cookies from response headers
if self.cookies is not None:
crequest = XMLRPCCookieAuthTransport.CookieRequest('http://%s/' % host)
cresponse = XMLRPCCookieAuthTransport.CookieResponse(response)
for cookie in self.cookies.make_cookies(cresponse, crequest):
if cookie.name.startswith('Set-Cookie'):
cookie.name = cookie.name.split(': ', 1)[1]
self.cookies.set_cookie(cookie)
if response.status != 200:
raise xmlrpc.client.ProtocolError(host + handler, response.status, response.reason,
response.getheaders())
return self.parse_response(response)
[docs]class SecureXMLRPCCookieAuthTransport(XMLRPCCookieAuthTransport):
"""Secure XML-RPC transport"""
_http_connection = http.client.HTTPSConnection
[docs]def get_client(uri, credentials=(), verbose=False, allow_none=0,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, headers=None):
# pylint: disable=protected-access,too-many-arguments
"""Get an XML-RPC client which supports basic authentication"""
if uri.startswith('https:'):
transport = SecureXMLRPCCookieAuthTransport(
'Python XML-RPC Client/0.1 (PyAMS secure transport)', credentials,
timeout=timeout, headers=headers)
else:
transport = XMLRPCCookieAuthTransport('Python XML-RPC Client/0.1 (PyAMS basic transport)',
credentials, timeout=timeout, headers=headers)
return xmlrpc.client.Server(uri, transport=transport, verbose=verbose, allow_none=allow_none)
[docs]def get_client_with_cookies(uri, credentials=(), verbose=False, allow_none=0,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, headers=None, cookies=None):
# pylint: disable=protected-access,too-many-arguments
"""Get an XML-RPC client which supports authentication through cookies"""
if cookies is None:
cookies = http.cookiejar.CookieJar()
if uri.startswith('https:'):
transport = SecureXMLRPCCookieAuthTransport(
'Python XML-RPC Client/0.1 (PyAMS secure cookie transport)',
credentials, cookies, timeout, headers)
else:
transport = XMLRPCCookieAuthTransport(
'Python XML-RPC Client/0.1 (PyAMS basic cookie transport)',
credentials, cookies, timeout, headers)
return xmlrpc.client.Server(uri, transport=transport, verbose=verbose, allow_none=allow_none)