#!/usr/bin/env python
# encoding:utf-8
"""
This module holds the class and methods to manage an iso store::
repository_dir
└── iso
├── $project1
│ │ ├── $version
│ │ | ├── $iso1
│ | │ ├── $iso1.md5sum
│ │ | └── $iso1.md5sum.sig
│ │ └── ...
│ └── ...
└── ...
"""
import os
import re
import logging
from getpass import getpass
from . import ArtifactStore
from ..utils import (
save_file,
list_files,
sign_detached,
)
from ..artifact import (
Artifact,
ArtifactName,
ArtifactList,
)
logger = logging.getLogger(__name__)
ISO_REGEX = r'(.*/)?(?P<name>[^\d/]+).(?P<version>\d[^/]*)\.iso'
[docs]class WrongIsoError(Exception):
"""
Any iso failure
"""
pass
[docs]class Iso(Artifact):
def __init__(self, path, temp_dir, verify_ssl=True):
nv_match = re.match(ISO_REGEX, path)
if not nv_match:
raise WrongIsoError(
"Can't extract name and version from %s"
% path,
)
self._name = nv_match.groupdict().get('name')
self._version = nv_match.groupdict().get('version')
super(Iso, self).__init__(
path=path,
temp_dir=temp_dir,
verify_ssl=verify_ssl,
)
with open(self.path) as fdno:
self.inode = os.fstat(fdno.fileno()).st_ino
@property
def name(self):
return self._name
@property
def full_name(self):
"""
Unique ISO Name.
This property should uniquely identify an ISO entity, in the
sense that if you have two isos with the same full_name they must
package the same content or one of them is wrongly generated (the
version was not bumped or something).
"""
return '%s(%s %s)' % (self.type, self.name, self.version)
@property
def version(self):
return self._version
@property
def extension(self):
return '.iso'
@property
def type(self):
return 'iso'
[docs] def sign(self, key, passwd):
with open(self.path + '.md5sum', 'w') as md5_fd:
md5_fd.write(self.md5)
sign_detached(self.path + '.md5sum', key=key, passphrase=passwd)
[docs]class IsoStore(ArtifactStore):
"""
Represents the repository sctructure, it does not require that the repo has
the structure specified in the module doc when loading it, but when adding
new isos it will create the new files in that directory structure.
Configuration options:
* temp_dir
Temporary dir to store any transient downloads (like isos from
urls). The caller should make sure it exists and clean it up if needed.
* path_prefix
Prefixes of this store inside the globl artifact repository, separated
by commas
* signing_key
Path to the gpg keey to sign the isos with, will not sign them if not
set
* signing_passphrase
Passphrase for the above key
"""
CONFIG_SECTION = 'IsoStore'
DEFAULT_CONFIG = {
'temp_dir': 'generate',
'path_prefix': 'iso',
'signing_key': '',
'signing_passphrase': 'ask',
}
def __init__(self, config, repo_path=None):
"""
:param path: Path to the repository directory, if passed it will
automatically add all the isos under it to the repo if any.
"""
ArtifactStore.__init__(
self,
config=config,
artifacts=ArtifactList('isos'),
)
self.name = self.__class__.__name__
self._path_prefix = config.get('path_prefix').split(',')
self.path = repo_path or ('Non persisten %s' % self.name)
self.to_copy = []
self.sign_key = config.get('signing_key')
self.sign_passphrase = config.get('signing_passphrase')
if self.sign_key and self.sign_passphrase == 'ask':
self.sign_passphrase = getpass('Key passphrase: ')
if repo_path:
logger.info('Loading repo %s', repo_path)
for iso in list_files(repo_path, '.iso'):
self.add_artifact(
iso,
to_copy=False,
hidelog=True,
)
logger.info('Repo %s loaded', repo_path)
@property
def path_prefix(self):
return self._path_prefix
[docs] def handles_artifact(self, artifact_str):
logger.debug('Checking if %s is an iso', artifact_str)
res = re.match(ISO_REGEX, artifact_str)
if res:
logger.debug(' It is')
return True
else:
logger.debug(' It is not')
return False
[docs] def add_artifact(self, iso, **args):
self.add_iso(iso, **args)
[docs] def add_iso(self, iso, onlyifnewer=False, to_copy=True, hidelog=False):
"""
Generic functon to add an iso package to the repo.
:param iso: path or url to the iso file to add
:param onlyifnewer: If set to True, will only add the package if it's
not there already or the version is newer than the on already
there.
:param to_copy: If set to True, will add that package to the list of
packages to copy into the repo when saving, usually used when
adding new packages to the repo.
:param hidelog: If set to True will not show the extra information
(used when loading a repository to avoid verbose output)
"""
iso = Iso(
iso,
temp_dir=self.config.get('temp_dir'),
verify_ssl=self.config.getboolean('verify_ssl')
)
if self.artifacts.add_pkg(iso, onlyifnewer):
if to_copy:
self.to_copy.append(iso)
if not hidelog:
logger.info('Adding iso %s to repo %s', iso.path, self.path)
else:
if not hidelog:
logger.info("Not adding %s, there's already an equal or "
"newer version", iso)
[docs] def save(self, **args):
self._save(**args)
def _save(self, onlylatest=False):
"""
Copy all the extra isos added to the repository and save it's state.
:param onlylatest: Only copy the latest version of the added isos.
"""
logger.info('Saving new added isos into %s', self.path)
for iso in self.to_copy:
if onlylatest and not self.is_latest_version(iso):
logger.info('Skipping %s a newer version is already '
'in the repo.', iso)
continue
dst_path = os.path.join(self.path,
self.path_prefix[0],
iso.generate_path())
save_file(iso.path, dst_path)
iso.path = dst_path
if self.sign_key:
logger.info('')
logger.info('Signing isos')
self.sign_isos()
logger.info('')
logger.info('Saved %s\n', self.path)
[docs] def is_latest_version(self, iso):
"""
Check if the given iso is the latest version in the repo
:param iso: ISO instance of the package to compare
"""
verlist = self.artifacts.get(iso.full_name, {})
if not verlist or iso.version in verlist.get_latest():
return True
return False
[docs] def delete_old(self, keep=1, noop=False):
"""
Delete the oldest versions for each package from the repo
:param keep: Maximium number of versions to keep of each package
:param noop: If set, will only log what will be done, not actually
doing anything.
"""
new_isos = ArtifactList(self.artifacts)
for name, versions in self.artifacts.iteritems():
if len(versions) <= keep:
continue
to_keep = ArtifactName()
for _ in range(keep):
latest = versions.get_latest()
to_keep.update(latest)
versions.pop(latest.keys()[0])
new_isos[name] = to_keep
for version in versions.keys():
logger.info('Deleting %s version %s', name, version)
versions.del_version(version, noop)
self.artifacts = new_isos
[docs] def get_artifacts(self, regmatch=None, fmatch=None):
"""
Get the list of isos, filtered or not.
:param regmatch: Regular expression that will be applied to the path of
each package to filter it
:param fmatch: Filter function that must return True for a package to
be selected, will be passed the iso object as only parameter
"""
return self.artifacts.get_artifacts(
regmatch=regmatch,
fmatch=fmatch)
[docs] def get_latest(self, regmatch=None, fmatch=None, num=1):
"""
Return the num latest versions for each iso in the repo
:param num: number of latest versions to return
"""
return self.artifacts.get_artifacts(
regmatch=regmatch,
fmatch=fmatch,
latest=num,
)
[docs] def sign_isos(self):
"""
Sign all the isos in the repo.
"""
passphrase = self.sign_passphrase
for iso in self.get_artifacts():
logger.info('Signing %s', iso)
iso.sign(self.sign_key, passphrase)
logger.info("Done signing")
[docs] def change_path(self, new_path):
"""
Changes the store path to the given one, copying any artifacts if
needed
Args:
new_path (str): New path to set
Returns:
None
"""
self.path = new_path
self.to_copy.extend(self.get_artifacts())