# # Cython -- Things that don't belong # anywhere else in particular # import os, sys, re, codecs modification_time = os.path.getmtime def cached_function(f): cache = {} uncomputed = object() def wrapper(*args): res = cache.get(args, uncomputed) if res is uncomputed: res = cache[args] = f(*args) return res return wrapper def cached_method(f): cache_name = '__%s_cache' % f.__name__ def wrapper(self, *args): cache = getattr(self, cache_name, None) if cache is None: cache = {} setattr(self, cache_name, cache) if args in cache: return cache[args] res = cache[args] = f(self, *args) return res return wrapper def replace_suffix(path, newsuf): base, _ = os.path.splitext(path) return base + newsuf def open_new_file(path): if os.path.exists(path): # Make sure to create a new file here so we can # safely hard link the output files. os.unlink(path) # we use the ISO-8859-1 encoding here because we only write pure # ASCII strings or (e.g. for file names) byte encoded strings as # Unicode, so we need a direct mapping from the first 256 Unicode # characters to a byte sequence, which ISO-8859-1 provides return codecs.open(path, "w", encoding="ISO-8859-1") def castrate_file(path, st): # Remove junk contents from an output file after a # failed compilation. # Also sets access and modification times back to # those specified by st (a stat struct). try: f = open_new_file(path) except EnvironmentError: pass else: f.write( "#error Do not use this file, it is the result of a failed Cython compilation.\n") f.close() if st: os.utime(path, (st.st_atime, st.st_mtime-1)) def file_newer_than(path, time): ftime = modification_time(path) return ftime > time @cached_function def search_include_directories(dirs, qualified_name, suffix, pos, include=False, sys_path=False): # Search the list of include directories for the given # file name. If a source file position is given, first # searches the directory containing that file. Returns # None if not found, but does not report an error. # The 'include' option will disable package dereferencing. # If 'sys_path' is True, also search sys.path. if sys_path: dirs = dirs + tuple(sys.path) if pos: file_desc = pos[0] from Cython.Compiler.Scanning import FileSourceDescriptor if not isinstance(file_desc, FileSourceDescriptor): raise RuntimeError("Only file sources for code supported") if include: dirs = (os.path.dirname(file_desc.filename),) + dirs else: dirs = (find_root_package_dir(file_desc.filename),) + dirs dotted_filename = qualified_name if suffix: dotted_filename += suffix if not include: names = qualified_name.split('.') package_names = tuple(names[:-1]) module_name = names[-1] module_filename = module_name + suffix package_filename = "__init__" + suffix for dir in dirs: path = os.path.join(dir, dotted_filename) if path_exists(path): return path if not include: package_dir = check_package_dir(dir, package_names) if package_dir is not None: path = os.path.join(package_dir, module_filename) if path_exists(path): return path path = os.path.join(dir, package_dir, module_name, package_filename) if path_exists(path): return path return None @cached_function def find_root_package_dir(file_path): dir = os.path.dirname(file_path) while is_package_dir(dir): parent = os.path.dirname(dir) if parent == dir: break dir = parent return dir @cached_function def check_package_dir(dir, package_names): for dirname in package_names: dir = os.path.join(dir, dirname) if not is_package_dir(dir): return None return dir @cached_function def is_package_dir(dir_path): for filename in ("__init__.py", "__init__.pyx", "__init__.pxd"): path = os.path.join(dir_path, filename) if path_exists(path): return 1 @cached_function def path_exists(path): # try on the filesystem first if os.path.exists(path): return True # figure out if a PEP 302 loader is around try: loader = __loader__ # XXX the code below assumes a 'zipimport.zipimporter' instance # XXX should be easy to generalize, but too lazy right now to write it archive_path = getattr(loader, 'archive', None) if archive_path: normpath = os.path.normpath(path) if normpath.startswith(archive_path): arcname = normpath[len(archive_path)+1:] try: loader.get_data(arcname) return True except IOError: return False except NameError: pass return False # file name encodings def decode_filename(filename): if isinstance(filename, unicode): return filename try: filename_encoding = sys.getfilesystemencoding() if filename_encoding is None: filename_encoding = sys.getdefaultencoding() filename = filename.decode(filename_encoding) except UnicodeDecodeError: pass return filename # support for source file encoding detection _match_file_encoding = re.compile(u"coding[:=]\s*([-\w.]+)").search def detect_file_encoding(source_filename): f = open_source_file(source_filename, encoding="UTF-8", error_handling='ignore') try: return detect_opened_file_encoding(f) finally: f.close() def detect_opened_file_encoding(f): # PEPs 263 and 3120 # Most of the time the first two lines fall in the first 250 chars, # and this bulk read/split is much faster. lines = f.read(250).split("\n") if len(lines) > 2: m = _match_file_encoding(lines[0]) or _match_file_encoding(lines[1]) if m: return m.group(1) else: return "UTF-8" else: # Fallback to one-char-at-a-time detection. f.seek(0) chars = [] for i in range(2): c = f.read(1) while c and c != u'\n': chars.append(c) c = f.read(1) encoding = _match_file_encoding(u''.join(chars)) if encoding: return encoding.group(1) return "UTF-8" normalise_newlines = re.compile(u'\r\n?|\n').sub class NormalisedNewlineStream(object): """The codecs module doesn't provide universal newline support. This class is used as a stream wrapper that provides this functionality. The new 'io' in Py2.6+/3.x supports this out of the box. """ def __init__(self, stream): # let's assume .read() doesn't change self.stream = stream self._read = stream.read self.close = stream.close self.encoding = getattr(stream, 'encoding', 'UTF-8') def read(self, count=-1): data = self._read(count) if u'\r' not in data: return data if data.endswith(u'\r'): # may be missing a '\n' data += self._read(1) return normalise_newlines(u'\n', data) def readlines(self): content = [] data = self.read(0x1000) while data: content.append(data) data = self.read(0x1000) return u''.join(content).splitlines(True) def seek(self, pos): if pos == 0: self.stream.seek(0) else: raise NotImplementedError io = None if sys.version_info >= (2,6): try: import io except ImportError: pass def open_source_file(source_filename, mode="r", encoding=None, error_handling=None, require_normalised_newlines=True): if encoding is None: # Most of the time the coding is unspecified, so be optimistic that # it's UTF-8. f = open_source_file(source_filename, encoding="UTF-8", mode=mode, error_handling='ignore') encoding = detect_opened_file_encoding(f) if encoding == "UTF-8" and error_handling=='ignore' and require_normalised_newlines: f.seek(0) return f else: f.close() # if not os.path.exists(source_filename): try: loader = __loader__ if source_filename.startswith(loader.archive): return open_source_from_loader( loader, source_filename, encoding, error_handling, require_normalised_newlines) except (NameError, AttributeError): pass # if io is not None: return io.open(source_filename, mode=mode, encoding=encoding, errors=error_handling) else: # codecs module doesn't have universal newline support stream = codecs.open(source_filename, mode=mode, encoding=encoding, errors=error_handling) if require_normalised_newlines: stream = NormalisedNewlineStream(stream) return stream def open_source_from_loader(loader, source_filename, encoding=None, error_handling=None, require_normalised_newlines=True): nrmpath = os.path.normpath(source_filename) arcname = nrmpath[len(loader.archive)+1:] data = loader.get_data(arcname) if io is not None: return io.TextIOWrapper(io.BytesIO(data), encoding=encoding, errors=error_handling) else: try: import cStringIO as StringIO except ImportError: import StringIO reader = codecs.getreader(encoding) stream = reader(StringIO.StringIO(data)) if require_normalised_newlines: stream = NormalisedNewlineStream(stream) return stream def str_to_number(value): # note: this expects a string as input that was accepted by the # parser already if len(value) < 2: value = int(value, 0) elif value[0] == '0': if value[1] in 'xX': # hex notation ('0x1AF') value = int(value[2:], 16) elif value[1] in 'oO': # Py3 octal notation ('0o136') value = int(value[2:], 8) elif value[1] in 'bB': # Py3 binary notation ('0b101') value = int(value[2:], 2) else: # Py2 octal notation ('0136') value = int(value, 8) else: value = int(value, 0) return value def long_literal(value): if isinstance(value, basestring): value = str_to_number(value) return not -2**31 <= value < 2**31 # all() and any() are new in 2.5 try: # Make sure to bind them on the module, as they will be accessed as # attributes all = all any = any except NameError: def all(items): for item in items: if not item: return False return True def any(items): for item in items: if item: return True return False @cached_function def get_cython_cache_dir(): """get the cython cache dir Priority: 1. CYTHON_CACHE_DIR 2. (OS X): ~/Library/Caches/Cython (posix not OS X): XDG_CACHE_HOME/cython if XDG_CACHE_HOME defined 3. ~/.cython """ if 'CYTHON_CACHE_DIR' in os.environ: return os.environ['CYTHON_CACHE_DIR'] parent = None if os.name == 'posix': if sys.platform == 'darwin': parent = os.path.expanduser('~/Library/Caches') else: # this could fallback on ~/.cache parent = os.environ.get('XDG_CACHE_HOME') if parent and os.path.isdir(parent): return os.path.join(parent, 'cython') # last fallback: ~/.cython return os.path.expanduser(os.path.join('~', '.cython'))