Source code for cassette.cassette_library

from __future__ import absolute_import
import hashlib
import logging
import os
import sys
from urlparse import urlparse

from cassette.config import Config
from cassette.http_response import MockedHTTPResponse
from cassette.utils import Encoder

log = logging.getLogger("cassette")


def _hash(content):
    m = hashlib.md5()
    m.update(content)
    return m.digest()


[docs]class CassetteName(unicode): """ A CassetteName represents an unique way to retrieve the cassette from the library. """ @classmethod
[docs] def from_httplib_connection(cls, host, port, method, url, body, headers, will_hash_body=False): """Create an object from an httplib request.""" if headers: if 'Host' in headers: # 'Host' is already covered explicitly below, remove from the # hash so hostname/post are easy to change and sed the file del headers['Host'] headers = hashlib.md5(repr(sorted(headers.items()))).hexdigest() if will_hash_body: if body: body = hashlib.md5(body).hexdigest() else: body = '' if url: # instantiated to 0.0.0.0 so we can parse parsed_url = urlparse('http://0.0.0.0' + url) url = parsed_url.path query = hashlib.md5(parsed_url.query + '#' + parsed_url.fragment).hexdigest() else: # requests/urllib3 defaults to '/' while urllib2 is ''. So this # value should be '/' to ensure compatability. url = '/' query = '' name = ("httplib:{method} {host}:{port}{url} {query} " "{headers} {body}").format(**locals()) else: # note that old yaml files will not contain the correct matching # query and body name = ("httplib:{method} {host}:{port}{url} " "{headers} {body}").format(**locals()) name = name.strip() return name
[docs]class CassetteLibrary(object): """ The CassetteLibrary holds the stored requests and manage them. This is an abstract class that needs to have several methods implemented. In addition, subclasses must store request/response pairs as a keys and values as a dictionary in the property `self.data` :param str filename: filename to use when storing and replaying requests. :param Encoder encoder: the instantiated encodeder to use """ cache = {} def __init__(self, filename, encoder, config=None): self.filename = os.path.abspath(filename) self.is_dirty = False self.used = set() self.config = config or self.get_default_config() self.encoder = encoder def get_default_config(self): return Config()
[docs] def add_response(self, cassette_name, response): """Add a new response to the mocked response. :param str cassette_name: :param response: """ if not cassette_name: raise TypeError("No cassette name provided.") mock_response_class = MockedHTTPResponse mocked = mock_response_class.from_response(response) self.data[cassette_name] = mocked # Mark the cassette changes as dirty for ejection self.is_dirty = True return mocked
[docs] def save_to_cache(self, file_hash, data): """Save a decoded data object into cache.""" CassetteLibrary.cache[self.filename] = { 'hash': file_hash, 'data': data }
[docs] def rewind(self): """Restore all responses to a re-seekable state.""" for k, v in self.data.items(): v.rewind()
def _had_response(self): """Mark that the library already the response. This is for testing purposes (it's complicated to patch the unpatched version of urllib2/httplib). """ pass
[docs] def cassette_name_for_httplib_connection(self, host, port, method, url, body, headers): """Create a cassette name from an httplib request.""" return CassetteName.from_httplib_connection( host, port, method, url, body, headers)
def _log_contains(self, cassette_name, contains): """Logging for checking access to cassettes.""" if contains: self._had_response() # For testing purposes log.info('Library has %s', cassette_name) else: log.info('Library does not have %s', cassette_name)
[docs] def log_cassette_used(self, path): """Log that a path was used.""" if self.config['log_cassette_used']: self.used.add(path)
[docs] def report_unused_cassettes(self, output=sys.stdout): """Report unused path to a file.""" if not self.config['log_cassette_used']: raise ValueError('Need to activate log_cassette_used first.') available = set(self.get_all_available()) unused = available.difference(self.used) output.write('\n'.join(unused)) # Methods that need to be implemented by subclasses
[docs] def write_to_file(self): """Write the response data to file.""" raise NotImplementedError('CassetteLibrary not implemented.')
def __contains__(self, cassette_name): raise NotImplementedError('CassetteLibrary not implemented.') def __getitem__(self, cassette_name): raise NotImplementedError('CassetteLibrary not implemented.') @classmethod
[docs] def create_new_cassette_library(cls, path, file_format, config=None): """Return an instantiated CassetteLibrary. Use this method to create new a CassetteLibrary. It will automatically determine if it should use a file or directory to back the cassette based on the filename. The method assumes that all file names with an extension (e.g. ``/file.json``) are files, and all file names without extensions are directories (e.g. ``/requests``). :param str path: filename of file or directory for storing requests :param str file_format: the file_format to use for storing requests :param dict config: configuration """ if not Encoder.is_supported_format(file_format): raise KeyError('%r is not a supported file_format.' % file_format) _, extension = os.path.splitext(path) if file_format: encoder = Encoder.get_encoder_from_file_format(file_format) else: encoder = Encoder.get_encoder_from_extension(extension) # Check if file has extension if extension: if os.path.isdir(path): raise IOError('Expected a file, but found a directory at %s' % path) klass = FileCassetteLibrary else: if os.path.isfile(path): raise IOError('Expected a directory, but found a file at %r' % path) klass = DirectoryCassetteLibrary return klass(path, encoder, config)
[docs]class FileCassetteLibrary(CassetteLibrary): """Store and manage requests with a single file.""" @property def data(self): """Lazily loaded data.""" if not hasattr(self, "_data"): self._data = self.load_file() return self._data
[docs] def write_to_file(self): """Write mocked responses to file.""" # Serialize the items via YAML data = {k: v.to_dict() for k, v in self.data.items()} encoded_str = self.encoder.dump(data) # Save the changes to file with open(self.filename, "w+") as f: f.write(encoded_str) # Update our hash self.save_to_cache(file_hash=_hash(encoded_str), data=self.data) self.is_dirty = False
[docs] def load_file(self): """Load MockedResponses from YAML file.""" data = {} filename = self.filename if not os.path.exists(filename): log.info("File '{f}' does not exist.".format(f=filename)) return data # Open and read in the file with open(filename) as f: encoded_str = f.read() encoded_hash = _hash(encoded_str) # If the contents are cached, return them cached_result = CassetteLibrary.cache.get(filename, None) if cached_result and cached_result['hash'] == encoded_hash: return cached_result['data'] # Otherwise, parse the contents content = self.encoder.load(encoded_str) if content: for k, v in content.items(): mock_response_class = MockedHTTPResponse data[k] = mock_response_class.from_dict(v) # Cache the file for later self.save_to_cache(file_hash=encoded_hash, data=data) return data
def __contains__(self, cassette_name): contains = cassette_name in self.data self._log_contains(cassette_name, contains) return contains def __getitem__(self, cassette_name): """Return the request from the loaded dictionary data.""" req = self.data.get(cassette_name, None) if not req: raise KeyError("Cassette '{c}' does not exist in \ library.".format(c=cassette_name)) req.rewind() return req
[docs] def get_all_available(self): """Return all available cassette.""" return self.data.keys()
[docs]class DirectoryCassetteLibrary(CassetteLibrary): """A CassetteLibrary that stores and manages requests with directory.""" def __init__(self, *args, **kwargs): super(DirectoryCassetteLibrary, self).__init__(*args, **kwargs) self.data = {}
[docs] def generate_filename(self, cassette_name): """Generate the filename for a given cassette name.""" for character in ('/', ':', ' '): cassette_name = cassette_name.replace(character, '_') return cassette_name + self.encoder.file_ext
[docs] def generate_path_from_cassette_name(self, cassette_name): """Generate the full path to cassette file.""" return os.path.join( self.filename, self.generate_filename(cassette_name))
[docs] def write_to_file(self): """Write mocked response to a directory of files.""" if not os.path.exists(self.filename): os.mkdir(self.filename) for cassette_name, response in self.data.items(): filename = self.generate_filename(cassette_name) encoded_str = self.encoder.dump(response.to_dict()) with open(os.path.join(self.filename, filename), 'w') as f: f.write(encoded_str) # Update our hash self.save_to_cache(file_hash=_hash(encoded_str), data=response) self.is_dirty = False
def __contains__(self, cassette_name): """Return whether or not the cassette already exists. The method first checks if it is already stored in memory. If not, it will check if a file supporting the cassette name exists. """ contains = cassette_name in self.data if not contains: # Check file directory if it exists filename = self.generate_path_from_cassette_name(cassette_name) contains = os.path.exists(filename) self._log_contains(cassette_name, contains) return contains def __getitem__(self, cassette_name): """Return the request if it is in memory. Otherwise, look to disk.""" if cassette_name in self.data: req = self.data[cassette_name] else: # If not in self.data, need to fetch from disk req = self._load_request_from_file(cassette_name) if not req: raise KeyError('Cassette %s does not exist in library.' % cassette_name) req.rewind() return req def _load_request_from_file(self, cassette_name): """Return the mocked response object from the encoded file. If the cassette file is in the cache, then use it. Otherwise, read from the disk to fetch the particular request. """ filename = self.generate_path_from_cassette_name(cassette_name) with open(filename) as f: encoded_str = f.read() self.log_cassette_used(self.generate_filename(cassette_name)) encoded_hash = _hash(encoded_str) # If the contents are cached, return them cached_result = CassetteLibrary.cache.get(filename, None) if cached_result and cached_result['hash'] == encoded_hash: return cached_result['data'] # Otherwise, parse the contents content = self.encoder.load(encoded_str) if content: mock_response_class = MockedHTTPResponse req = mock_response_class.from_dict(content) # Cache the file for later self.save_to_cache(file_hash=encoded_hash, data=req) return req # Override
[docs] def cassette_name_for_httplib_connection(self, host, port, method, url, body, headers): """Create a cassette name from an httplib request.""" return CassetteName.from_httplib_connection( host, port, method, url, body, headers, will_hash_body=True)
[docs] def get_all_available(self): """Return all available cassette.""" return os.listdir(self.filename)