Commit 81365034 authored by Hilal Ozdemir's avatar Hilal Ozdemir Committed by Hilal Ozdemir

refactor and pylint fixes

parent d67b0013
...@@ -92,7 +92,8 @@ async def async_write_file_stream(url: str, save_path: str, queue=None, # pylint ...@@ -92,7 +92,8 @@ async def async_write_file_stream(url: str, save_path: str, queue=None, # pylint
if resp.status >= 400: if resp.status >= 400:
raise Exception("Failed to write stream to file. code: %d"%resp.status) raise Exception("Failed to write stream to file. code: %d"%resp.status)
if not resp.content: if not resp.content:
raise Exception("Failed to write stream to file. Response does not have a body.") raise Exception(
"Failed to write stream to file. Response does not have a body.")
with open(save_path, 'wb')as file: with open(save_path, 'wb')as file:
async for chunk in input_reader(resp.content): async for chunk in input_reader(resp.content):
file.write(chunk) file.write(chunk)
......
...@@ -335,19 +335,20 @@ class ImageList(RPCEndpoint): ...@@ -335,19 +335,20 @@ class ImageList(RPCEndpoint):
) )
def when_done(task): def when_done(task):
if task.exception(): if task.exception():
Services.logger.error("create_or_download_config error: {}".format(task.exception())) Services.logger.error(
"create_or_download_config error: {}".format(task.exception()))
first_layer_start_task.set_exception(task.exception()) first_layer_start_task.set_exception(task.exception())
else: else:
Services.logger.debug("create_or_download_config done.") Services.logger.debug("create_or_download_config done.")
config_future.add_done_callback(when_done) config_future.add_done_callback(when_done)
try: try:
await first_layer_start_task await first_layer_start_task
except Exception as e: except Services.docker_util.ManifestError as err:
if show_progress: if show_progress:
rpc_endpoint.write('{"progress":[{"error":"%s"}]}'%str(e)) rpc_endpoint.write('{"progress":[{"error":"%s"}]}'%str(err))
rpc_endpoint.finish() rpc_endpoint.finish()
else: else:
rpc_endpoint.write('{"resp_info":[{"error":"%s"}]}'%str(e)) rpc_endpoint.write('{"resp_info":[{"error":"%s"}]}'%str(err))
rpc_endpoint.finish() rpc_endpoint.finish()
return return
...@@ -446,7 +447,6 @@ class ImageList(RPCEndpoint): ...@@ -446,7 +447,6 @@ class ImageList(RPCEndpoint):
online_availables = [n for n in available_nodes if n in online_nodes] online_availables = [n for n in available_nodes if n in online_nodes]
if online_availables: if online_availables:
node_identifier = random.choice(online_availables) node_identifier = random.choice(online_availables)
if not node_identifier: if not node_identifier:
raise HTTPError(status_code=404, log_message='Image is not available in cluster') raise HTTPError(status_code=404, log_message='Image is not available in cluster')
......
...@@ -26,13 +26,12 @@ import asyncio ...@@ -26,13 +26,12 @@ import asyncio
# import progressbar # import progressbar
import click import click
from tabulate import tabulate from tabulate import tabulate
import json
from beiran_package_docker.util import DockerUtil
from beiran.util import json_streamer from beiran.util import json_streamer
from beiran.util import sizeof_fmt from beiran.util import sizeof_fmt
from beiran.multiple_progressbar import MultipleProgressBar from beiran.multiple_progressbar import MultipleProgressBar
from beiran.cli import pass_context from beiran.cli import pass_context
from beiran_package_docker.util import DockerUtil
@click.group() @click.group()
...@@ -53,6 +52,117 @@ def image(): ...@@ -53,6 +52,117 @@ def image():
""" """
pass pass
async def _pull_with_progress(ctx, imagename, node, force):
"""Pull image with async client (whole image)"""
progbar = MultipleProgressBar(desc=imagename)
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=True,
progress=True,
raise_error=True
)
is_finished = False
async for update in json_streamer(resp.content, '$.progress[::]'):
err = update.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if update.get('finished'):
click.echo("Image pulling process finished.")
is_finished = True
break
progbar.update(update['progress'])
progbar.finish()
if not is_finished:
click.echo('An error occured!')
async def _pull_with_progress_distributed(ctx, imagename, node, force):
"""Pull image with async client (distributed)"""
progbars = {}
try:
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=False,
progress=True,
raise_error=True
)
except Exception as err: # pylint: disable=broad-except
click.echo('An exception is catched while requesting pull image!')
click.echo(str(err))
return
click.echo('Downloading layers...')
lastbar = None
async for data in json_streamer(resp.content, '$.progress[::]'):
err = data.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if data.get('finished'):
if lastbar:
lastbar.seek_last_line()
click.echo("Image pulling process finished.")
return
digest = data['digest']
if digest not in progbars:
if data['status'] == DockerUtil.DL_ALREADY:
progbars[digest] = {
'bar': MultipleProgressBar(
widgets=[digest + ' Already exists']
)
}
else:
progbars[digest] = {
'bar': MultipleProgressBar(desc=digest)
}
progbars[digest]['bar'].update_and_seek(data['progress'])
lastbar = progbars[digest]['bar']
if lastbar:
lastbar.seek_last_line()
click.echo('An error occured!')
async def _pull_without_progress(ctx, imagename, node, wait, force, whole_image_only): # pylint: disable=too-many-arguments
"""Pull image with async client"""
try:
click.echo("Requesting image pull...")
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=False,
raise_error=True
)
except Exception as err: # pylint: disable=broad-except
click.echo('An exception is catched while requesting pull image!')
click.echo(str(err))
return
async for data in json_streamer(resp.content, '$.resp_info[::]'):
err = data.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if data.get('started'):
click.echo("Image pulling process started.")
if not wait:
return
if data.get('finished'):
click.echo("Image pulling process finished.")
return
click.echo('An error occured while pulling image!')
@image.command('pull') @image.command('pull')
@click.option('--from', 'node', default=None, @click.option('--from', 'node', default=None,
...@@ -74,129 +184,16 @@ def image_pull(ctx, node: str, wait: bool, force: bool, noprogress: bool, ...@@ -74,129 +184,16 @@ def image_pull(ctx, node: str, wait: bool, force: bool, noprogress: bool,
"""Pull a container image from cluster or repository""" """Pull a container image from cluster or repository"""
click.echo( click.echo(
'Pulling image %s from %s!' % (imagename, node or "available nodes")) 'Pulling image %s from %s!' % (imagename, node or "available nodes"))
loop = asyncio.get_event_loop()
if not noprogress: if not noprogress:
if whole_image_only: if whole_image_only:
progbar = MultipleProgressBar(desc=imagename) loop.run_until_complete(_pull_with_progress(ctx, imagename, node, force))
async def _pull_with_progress():
"""Pull image with async client"""
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
isFinished = False
async for update in json_streamer(resp.content, '$.progress[::]'):
err = update.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if update.get('finished'):
click.echo("Image pulling process finished.")
isFinished = True
break
progbar.update(update['progress'])
progbar.finish()
if not isFinished:
click.echo('An error occured!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_with_progress())
else: else:
async def _pull_with_progress(): loop.run_until_complete(_pull_with_progress_distributed(ctx, imagename, node, force))
"""Pull image with async client"""
progbars = {}
try:
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
except Exception as e:
click.echo('An exception is catched while requesting pull image!')
click.echo(str(e))
return
click.echo('Downloading layers...')
lastbar = None
async for data in json_streamer(resp.content, '$.progress[::]'):
err = data.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if data.get('finished'):
if lastbar:
lastbar.seek_last_line()
click.echo("Image pulling process finished.")
return
digest = data['digest']
if digest not in progbars:
if data['status'] == DockerUtil.DL_ALREADY:
progbars[digest] = {
'bar': MultipleProgressBar(
widgets=[digest + ' Already exists']
)
}
else:
progbars[digest] = {
'bar': MultipleProgressBar(desc=digest)
}
progbars[digest]['bar'].update_and_seek(data['progress'])
lastbar = progbars[digest]['bar']
if lastbar:
lastbar.seek_last_line()
click.echo('An error occured!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_with_progress())
else: else:
async def _pull_without_progress(): loop.run_until_complete(
"""Pull image with async client""" _pull_without_progress(ctx, imagename, node, wait, force, whole_image_only))
try:
click.echo("Requesting image pull...")
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=False,
raise_error=True
)
except Exception as e:
click.echo('An exception is catched while requesting pull image!')
click.echo(str(e))
return
async for data in json_streamer(resp.content, '$.resp_info[::]'):
err = data.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if data.get('started'):
click.echo("Image pulling process started.")
if not wait:
return
if data.get('finished'):
click.echo("Image pulling process finished.")
return
click.echo('An error occured while pulling image!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_without_progress())
# pylint: enable-msg=too-many-arguments # pylint: enable-msg=too-many-arguments
......
...@@ -29,11 +29,6 @@ from aiodocker import Docker ...@@ -29,11 +29,6 @@ from aiodocker import Docker
from aiodocker.exceptions import DockerError from aiodocker.exceptions import DockerError
from peewee import SQL from peewee import SQL
from beiran.config import config
from beiran.plugin import BasePackagePlugin, History
from beiran.models import Node
from beiran.daemon.peer import Peer
from beiran_package_docker.image_ref import del_idpref from beiran_package_docker.image_ref import del_idpref
from beiran_package_docker.models import DockerImage, DockerLayer from beiran_package_docker.models import DockerImage, DockerLayer
from beiran_package_docker.models import MODEL_LIST from beiran_package_docker.models import MODEL_LIST
...@@ -41,6 +36,10 @@ from beiran_package_docker.util import DockerUtil ...@@ -41,6 +36,10 @@ from beiran_package_docker.util import DockerUtil
from beiran_package_docker.api import ROUTES from beiran_package_docker.api import ROUTES
from beiran_package_docker.api import Services as ApiDependencies from beiran_package_docker.api import Services as ApiDependencies
from beiran.config import config
from beiran.plugin import BasePackagePlugin, History
from beiran.models import Node
from beiran.daemon.peer import Peer
PLUGIN_NAME = 'docker' PLUGIN_NAME = 'docker'
PLUGIN_TYPE = 'package' PLUGIN_TYPE = 'package'
......
...@@ -298,7 +298,7 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes ...@@ -298,7 +298,7 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
async def get_digest_by_diffid(self, diffid: str)-> Optional[str]: async def get_digest_by_diffid(self, diffid: str)-> Optional[str]:
"""Return digest of a layer by diff id.""" """Return digest of a layer by diff id."""
try: try:
with open(os.path.join(self.v2metadata_path, del_idpref(diffid)))as file: with open(os.path.join(self.v2metadata_path, del_idpref(diffid))) as file:
content = json.load(file) content = json.load(file)
return content[0]['Digest'] return content[0]['Digest']
except FileNotFoundError: except FileNotFoundError:
...@@ -606,13 +606,17 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes ...@@ -606,13 +606,17 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# HEAD request for get size # HEAD request for get size
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT, resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD', Authorization=requirements) retry=self.RETRY, method='HEAD', Authorization=requirements)
if resp.status >= 400: if resp.status >= 400:
raise DockerUtil.LayerDownloadFailed("Failed to download layer from %s. code: %d"%(url, resp.status)) raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. code: %d"%(url, resp.status))
if not resp.content: if not resp.content:
raise DockerUtil.LayerDownloadFailed("Failed to download layer from %s. Response does not have a body."%url) raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. Response does not have a body."%url)
if not resp.headers.get('content-length'): if not resp.headers.get('content-length'):
raise DockerUtil.LayerDownloadFailed("Failed to download layer from %s. Response headers do not have content-length."%url) raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. Response headers do not have content-length."
%url)
layer_size = int(resp.headers.get('content-length')) layer_size = int(resp.headers.get('content-length'))
...@@ -647,11 +651,15 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes ...@@ -647,11 +651,15 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT, resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD') retry=self.RETRY, method='HEAD')
if resp.status >= 400: if resp.status >= 400:
raise DockerUtil.LayerDownloadFailed("Failed to download layer from %s. code: %d"%(url, resp.status)) raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. code: %d"%(url, resp.status))
if not resp.content: if not resp.content:
raise DockerUtil.LayerDownloadFailed("Failed to download layer from %s. Response does not have a body."%url) raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. Response does not have a body."%url)
if not resp.headers.get('content-length'): if not resp.headers.get('content-length'):
raise DockerUtil.LayerDownloadFailed("Failed to download layer from %s. Response headers do not have content-length."%url) raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. " +
"Response headers do not have content-length."%url)
layer_size = int(resp.headers.get('content-length')) layer_size = int(resp.headers.get('content-length'))
...@@ -953,19 +961,17 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes ...@@ -953,19 +961,17 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# get manifest # get manifest
try: try:
manifest = await self.fetch_docker_image_manifest(ref['domain'], ref['repo'], manifest = await self.fetch_docker_image_manifest(ref['domain'], ref['repo'],
manifest_digest) manifest_digest)
except DockerUtil.FetchManifestFailed as e: except DockerUtil.FetchManifestFailed as err:
raise DockerUtil.ManifestError(str(e)) raise DockerUtil.ManifestError(str(err))
schema_v = manifest['schemaVersion'] if manifest['schemaVersion'] == 1:
if schema_v == 1:
# pull layers and create config from version 1 manifest # pull layers and create config from version 1 manifest
config_json_str, config_digest, _ = await self.fetch_config_schema_v1( config_json_str, config_digest, _ = await self.fetch_config_schema_v1(
ref, manifest, jobid ref, manifest, jobid
) )
elif schema_v == 2: elif manifest['schemaVersion'] == 2:
if manifest['mediaType'] == 'application/vnd.docker.distribution.manifest.v2+json': if manifest['mediaType'] == 'application/vnd.docker.distribution.manifest.v2+json':
# pull layers using version 2 manifest # pull layers using version 2 manifest
config_json_str, config_digest, _ = await self.fetch_config_schema_v2( config_json_str, config_digest, _ = await self.fetch_config_schema_v2(
...@@ -974,7 +980,7 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes ...@@ -974,7 +980,7 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
else: else:
raise DockerUtil.ManifestError('Invalid media type: %d' % manifest['mediaType']) raise DockerUtil.ManifestError('Invalid media type: %d' % manifest['mediaType'])
else: else:
raise DockerUtil.ManifestError('Invalid schema version: %d' % schema_v) raise DockerUtil.ManifestError('Invalid schema version: %d' % manifest['schemaVersion'])
manifestlist_str = json.dumps(manifestlist, indent=3) manifestlist_str = json.dumps(manifestlist, indent=3)
repo_digest = add_idpref(hashlib.sha256(manifestlist_str.encode('utf-8')).hexdigest()) repo_digest = add_idpref(hashlib.sha256(manifestlist_str.encode('utf-8')).hexdigest())
...@@ -1076,8 +1082,8 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes ...@@ -1076,8 +1082,8 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
try: try:
manifest = await self.fetch_docker_image_manifest( manifest = await self.fetch_docker_image_manifest(
ref['domain'], ref['repo'], ref['suffix']) ref['domain'], ref['repo'], ref['suffix'])
except DockerUtil.FetchManifestFailed as e: except DockerUtil.FetchManifestFailed as err:
raise DockerUtil.ManifestError(str(e)) raise DockerUtil.ManifestError(str(err))
schema_v = manifest['schemaVersion'] schema_v = manifest['schemaVersion']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment