Commit d67b0013 authored by Hilal Ozdemir's avatar Hilal Ozdemir Committed by Hilal Ozdemir

fixes in errors and error handling logic

parent 671854ac
......@@ -381,11 +381,11 @@ def clean_keys(dict_: dict, keys: list) -> None:
if key in dict_:
del dict_[key]
async def until_event(emitter: EventEmitter, name: str, loop=asyncio.get_event_loop()):
def until_event(emitter: EventEmitter, name: str, loop=asyncio.get_event_loop()):
"""Wait task until triggered the event"""
future: asyncio.Future = asyncio.Future(loop=loop)
# not consider to duplicate registrations of event
emitter.once(name, lambda: future.set_result(None))
await future
return future
......@@ -311,12 +311,10 @@ class ImageList(RPCEndpoint):
if whole_image_only:
await self.pull_routine(image_identifier, node_identifier,
self, wait, show_progress, force)
else:
# distributed layer-by-layer download
await self.pull_routine_distributed(image_identifier, self, wait, show_progress)
@staticmethod
async def pull_routine_distributed(tag_or_digest: str, rpc_endpoint: "RPCEndpoint" = None, # pylint: disable=too-many-locals,too-many-branches, too-many-statements
wait: bool = False, show_progress: bool = False) -> None:
......@@ -324,25 +322,51 @@ class ImageList(RPCEndpoint):
"""
Services.logger.debug("Will fetch %s", tag_or_digest) # type: ignore
if not wait and not show_progress and rpc_endpoint is not None:
rpc_endpoint.write({'started':True})
rpc_endpoint.finish()
if show_progress:
rpc_endpoint.write('{"image":"%s","progress":[' % tag_or_digest) # type: ignore
rpc_endpoint.flush() # type: ignore
jobid = uuid.uuid4().hex
Services.docker_util.create_emitter(jobid) # type: ignore
config_future = asyncio.ensure_future(
Services.docker_util.create_or_download_config(tag_or_digest, jobid) # type: ignore
)
await until_event(
first_layer_start_task = until_event(
Services.docker_util.emitters[jobid], # type: ignore
Services.docker_util.EVENT_START_LAYER_DOWNLOAD # type: ignore
)
config_future = asyncio.ensure_future(
Services.docker_util.create_or_download_config(tag_or_digest, jobid) # type: ignore
)
def when_done(task):
if task.exception():
Services.logger.error("create_or_download_config error: {}".format(task.exception()))
first_layer_start_task.set_exception(task.exception())
else:
Services.logger.debug("create_or_download_config done.")
config_future.add_done_callback(when_done)
try:
await first_layer_start_task
except Exception as e:
if show_progress:
rpc_endpoint.write('{"progress":[{"error":"%s"}]}'%str(e))
rpc_endpoint.finish()
else:
rpc_endpoint.write('{"resp_info":[{"error":"%s"}]}'%str(e))
rpc_endpoint.finish()
return
if rpc_endpoint is not None:
if not wait:
rpc_endpoint.write('{"resp_info":[{"started":true}]}')
rpc_endpoint.finish()
else:
rpc_endpoint.write('{"resp_info":[{"started":true}')
rpc_endpoint.flush()
if show_progress:
rpc_endpoint.write('],"image":"%s","progress":[' % tag_or_digest) # type: ignore
rpc_endpoint.flush() # type: ignore
else:
rpc_endpoint.write(',')
rpc_endpoint.flush()
def format_progress(digest: str, status: str, progress: int = 100):
"""generate json dictionary for sending progress of layer downloading"""
return '{"digest": "%s", "status": "%s", "progress": %d},' % (digest, status, progress)
......@@ -392,10 +416,6 @@ class ImageList(RPCEndpoint):
config_json_str, image_id, _ = await config_future
del Services.docker_util.queues[jobid] # type: ignore
if show_progress:
rpc_endpoint.write(format_progress('done', 'done')[:-1]) # type: ignore
rpc_endpoint.flush() # type: ignore
# Do we need to save repo_digest to database?
# config_json_str, image_id, _ = \
# await Services.docker_util.create_or_download_config(tag_or_digest) # type: ignore
......@@ -410,12 +430,8 @@ class ImageList(RPCEndpoint):
# image.repo_digests.add(repo_digest)
# image.save()
if wait and not show_progress:
rpc_endpoint.write({'finished':True}) # type: ignore
rpc_endpoint.finish() # type: ignore
if show_progress:
rpc_endpoint.write(']}') # type: ignore
if wait:
rpc_endpoint.write('{"finished":true}]}') # type: ignore
rpc_endpoint.finish() # type: ignore
@staticmethod
......
......@@ -26,6 +26,7 @@ import asyncio
# import progressbar
import click
from tabulate import tabulate
import json
from beiran.util import json_streamer
from beiran.util import sizeof_fmt
......@@ -57,10 +58,10 @@ def image():
@click.option('--from', 'node', default=None,
help='Pull from spesific node')
@click.option('--wait', 'wait', default=False, is_flag=True,
help='Waits result of pulling image')
help='Wait the result of pulling image')
@click.option('--force', 'force', default=False, is_flag=True,
help='Forces download of image even if the node is not recognised')
@click.option('--noprogress', 'noprogress', default=False, is_flag=True,
@click.option('--no-progress', 'noprogress', default=False, is_flag=True,
help='Disable image transfer progress display')
@click.option('--whole-image-only', 'whole_image_only', default=False, is_flag=True,
help='Pull an image from other node (not each layer)')
......@@ -83,76 +84,119 @@ def image_pull(ctx, node: str, wait: bool, force: bool, noprogress: bool,
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
wait=True,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
isFinished = False
async for update in json_streamer(resp.content, '$.progress[::]'):
err = update.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if update.get('finished'):
click.echo("Image pulling process finished.")
isFinished = True
break
progbar.update(update['progress'])
progbar.finish()
if not isFinished:
click.echo('An error occured!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_with_progress())
click.echo('done!')
else:
async def _pull_with_progress():
"""Pull image with async client"""
progbars = {}
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
try:
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
except Exception as e:
click.echo('An exception is catched while requesting pull image!')
click.echo(str(e))
return
click.echo('Downloading layers...')
lastbar = None
async for data in json_streamer(resp.content, '$.progress[::]'):
err = data.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if data.get('finished'):
if lastbar:
lastbar.seek_last_line()
click.echo("Image pulling process finished.")
return
digest = data['digest']
if digest == 'done':
lastbar.seek_last_line()
click.echo('Loading image...')
else:
if digest not in progbars:
if data['status'] == DockerUtil.DL_ALREADY:
progbars[digest] = {
'bar': MultipleProgressBar(
widgets=[digest + ' Already exists']
)
}
else:
progbars[digest] = {
'bar': MultipleProgressBar(desc=digest)
}
progbars[digest]['bar'].update_and_seek(data['progress'])
lastbar = progbars[digest]['bar']
click.echo('done!')
if digest not in progbars:
if data['status'] == DockerUtil.DL_ALREADY:
progbars[digest] = {
'bar': MultipleProgressBar(
widgets=[digest + ' Already exists']
)
}
else:
progbars[digest] = {
'bar': MultipleProgressBar(desc=digest)
}
progbars[digest]['bar'].update_and_seek(data['progress'])
lastbar = progbars[digest]['bar']
if lastbar:
lastbar.seek_last_line()
click.echo('An error occured!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_with_progress())
else:
result = ctx.beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=False,
raise_error=True
)
if "started" in result:
click.echo("Process is started")
if "finished" in result:
click.echo("Process is finished")
async def _pull_without_progress():
"""Pull image with async client"""
try:
click.echo("Requesting image pull...")
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=False,
raise_error=True
)
except Exception as e:
click.echo('An exception is catched while requesting pull image!')
click.echo(str(e))
return
async for data in json_streamer(resp.content, '$.resp_info[::]'):
err = data.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return
if data.get('started'):
click.echo("Image pulling process started.")
if not wait:
return
if data.get('finished'):
click.echo("Image pulling process finished.")
return
click.echo('An error occured while pulling image!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_without_progress())
# pylint: enable-msg=too-many-arguments
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment