Commit 8dd0ca0d authored by Furkan Mustafa's avatar Furkan Mustafa

Merge branch 'error-handling-fixes' into '201903-small-improvements'

Error handling fixes

See merge request !42
parents 4667f5c2 215a7178
......@@ -89,7 +89,11 @@ async def async_write_file_stream(url: str, save_path: str, queue=None, # pylint
async with async_timeout.timeout(timeout):
async with session.request(method, url, json=json,
data=data, headers=headers) as resp:
if resp.status >= 400:
raise Exception("Failed to write stream to file. code: %d"%resp.status)
if not resp.content:
raise Exception(
"Failed to write stream to file. Response does not have a body.")
with open(save_path, 'wb')as file:
async for chunk in input_reader(resp.content):
file.write(chunk)
......
......@@ -381,11 +381,11 @@ def clean_keys(dict_: dict, keys: list) -> None:
if key in dict_:
del dict_[key]
async def until_event(emitter: EventEmitter, name: str, loop=asyncio.get_event_loop()):
def until_event(emitter: EventEmitter, name: str, loop=asyncio.get_event_loop()):
"""Wait task until triggered the event"""
future: asyncio.Future = asyncio.Future(loop=loop)
# not consider to duplicate registrations of event
emitter.once(name, lambda: future.set_result(None))
await future
return future
......@@ -311,12 +311,10 @@ class ImageList(RPCEndpoint):
if whole_image_only:
await self.pull_routine(image_identifier, node_identifier,
self, wait, show_progress, force)
else:
# distributed layer-by-layer download
await self.pull_routine_distributed(image_identifier, self, wait, show_progress)
@staticmethod
async def pull_routine_distributed(tag_or_digest: str, rpc_endpoint: "RPCEndpoint" = None, # pylint: disable=too-many-locals,too-many-branches, too-many-statements
wait: bool = False, show_progress: bool = False) -> None:
......@@ -324,25 +322,53 @@ class ImageList(RPCEndpoint):
"""
Services.logger.debug("Will fetch %s", tag_or_digest) # type: ignore
if not wait and not show_progress and rpc_endpoint is not None:
rpc_endpoint.write({'started':True})
rpc_endpoint.finish()
if show_progress:
rpc_endpoint.write('{"image":"%s","progress":[' % tag_or_digest) # type: ignore
rpc_endpoint.flush() # type: ignore
jobid = uuid.uuid4().hex
Services.docker_util.create_emitter(jobid) # type: ignore
config_future = asyncio.ensure_future(
Services.docker_util.create_or_download_config(tag_or_digest, jobid) # type: ignore
)
await until_event(
first_layer_start_task = until_event(
Services.docker_util.emitters[jobid], # type: ignore
Services.docker_util.EVENT_START_LAYER_DOWNLOAD # type: ignore
)
config_future = asyncio.ensure_future(
Services.docker_util.create_or_download_config(tag_or_digest, jobid) # type: ignore
)
def when_done(task):
if task.exception():
Services.logger.error(
"create_or_download_config error: {}".format(task.exception()))
first_layer_start_task.set_exception(task.exception())
else:
Services.logger.debug("create_or_download_config done.")
config_future.add_done_callback(when_done)
try:
await first_layer_start_task
except Exception as err: # pylint: disable=broad-except
if rpc_endpoint:
if show_progress:
rpc_endpoint.write('{"progress":[{"error":"%s"}]}' % str(err))
rpc_endpoint.finish()
else:
rpc_endpoint.write('{"status":[{"error":"%s"}]}' % str(err))
rpc_endpoint.finish()
return
if rpc_endpoint:
if not wait:
rpc_endpoint.write('{"status":[{"started":true}]}')
rpc_endpoint.finish()
else:
rpc_endpoint.write('{"status":[{"started":true}')
rpc_endpoint.flush()
if show_progress:
rpc_endpoint.write('],"image":"%s","progress":[' % tag_or_digest) # type: ignore
rpc_endpoint.flush() # type: ignore
else:
rpc_endpoint.write(',')
rpc_endpoint.flush()
def format_progress(digest: str, status: str, progress: int = 100):
"""generate json dictionary for sending progress of layer downloading"""
return '{"digest": "%s", "status": "%s", "progress": %d},' % (digest, status, progress)
......@@ -382,40 +408,37 @@ class ImageList(RPCEndpoint):
else:
return
pro_tasks = [
send_progress(digest)
for digest in Services.docker_util.queues[jobid].keys() # type: ignore
]
pro_future = asyncio.gather(*pro_tasks)
await pro_future
config_json_str, image_id, _ = await config_future
del Services.docker_util.queues[jobid] # type: ignore
if show_progress:
rpc_endpoint.write(format_progress('done', 'done')[:-1]) # type: ignore
rpc_endpoint.flush() # type: ignore
# Do we need to save repo_digest to database?
# config_json_str, image_id, _ = \
# await Services.docker_util.create_or_download_config(tag_or_digest) # type: ignore
tarball_path = await Services.docker_util.create_image_from_tar( # type: ignore
tag_or_digest, config_json_str, image_id)
await Services.docker_util.load_image(tarball_path) # type: ignore
try:
pro_tasks = [
send_progress(digest)
for digest in Services.docker_util.queues[jobid].keys() # type: ignore
]
pro_future = asyncio.gather(*pro_tasks)
await pro_future
config_json_str, image_id, _ = await config_future
del Services.docker_util.queues[jobid] # type: ignore
# Do we need to save repo_digest to database?
# config_json_str, image_id, _ = \
# await Services.docker_util.create_or_download_config(tag_or_digest) # type: ignore
tarball_path = await Services.docker_util.create_image_from_tar( # type: ignore
tag_or_digest, config_json_str, image_id)
await Services.docker_util.load_image(tarball_path) # type: ignore
except Exception as err: # pylint: disable=broad-except
rpc_endpoint.write('{"error":"%s"}]}' % str(err)) # type: ignore
rpc_endpoint.finish() # type: ignore
return
# # save repo_digest ?
# image = DockerImage.get().where(...)
# image.repo_digests.add(repo_digest)
# image.save()
if wait and not show_progress:
rpc_endpoint.write({'finished':True}) # type: ignore
rpc_endpoint.finish() # type: ignore
if show_progress:
rpc_endpoint.write(']}') # type: ignore
if rpc_endpoint and wait:
rpc_endpoint.write('{"finished":true}]}') # type: ignore
rpc_endpoint.finish() # type: ignore
@staticmethod
......@@ -430,7 +453,6 @@ class ImageList(RPCEndpoint):
online_availables = [n for n in available_nodes if n in online_nodes]
if online_availables:
node_identifier = random.choice(online_availables)
if not node_identifier:
raise HTTPError(status_code=404, log_message='Image is not available in cluster')
......
......@@ -23,15 +23,16 @@ Beiran Docker Plugin command line interface module
"""
import asyncio
import sys
# import progressbar
import click
from tabulate import tabulate
from beiran_package_docker.util import DockerUtil
from beiran.util import json_streamer
from beiran.util import sizeof_fmt
from beiran.multiple_progressbar import MultipleProgressBar
from beiran.cli import pass_context
from beiran_package_docker.util import DockerUtil
@click.group()
......@@ -52,15 +53,129 @@ def image():
"""
pass
async def _pull_with_progress(ctx, imagename, node, force):
"""Pull image with async client (whole image)"""
progbar = MultipleProgressBar(desc=imagename)
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=True,
progress=True,
raise_error=True
)
is_finished = False
async for update in json_streamer(resp.content, '$.progress[::]'):
err = update.get('error')
if err:
click.echo('An error occured while pulling the image. {}'.format(err))
return 1
if update.get('finished'):
click.echo("Image pulling process finished.")
is_finished = True
break
progbar.update(update['progress'])
progbar.finish()
if not is_finished:
click.echo('An error occured!')
return 1
async def _pull_with_progress_distributed(ctx, imagename, node, force):
"""Pull image with async client (distributed)"""
progbars = {}
try:
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=True,
force=force,
whole_image_only=False,
progress=True,
raise_error=True
)
except Exception as err: # pylint: disable=broad-except
click.echo('An exception is catched while requesting pull image!')
click.echo(str(err))
return 1
click.echo('Downloading layers...')
lastbar = None
async for data in json_streamer(resp.content, '$.progress[::]'):
resp_err = data.get('error')
if resp_err:
click.echo('An error occured while pulling the image. {}'.format(resp_err))
return 1
if data.get('finished'):
if lastbar:
lastbar.seek_last_line()
click.echo("Image pulling process finished.")
return
digest = data['digest']
if digest not in progbars:
if data['status'] == DockerUtil.DL_ALREADY:
progbars[digest] = {
'bar': MultipleProgressBar(
widgets=[digest + ' Already exists']
)
}
else:
progbars[digest] = {
'bar': MultipleProgressBar(desc=digest)
}
progbars[digest]['bar'].update_and_seek(data['progress'])
lastbar = progbars[digest]['bar']
if lastbar:
lastbar.seek_last_line()
click.echo('An error occured!')
return 1
async def _pull_without_progress(ctx, imagename, node, wait, force, whole_image_only): # pylint: disable=too-many-arguments
"""Pull image with async client"""
try:
click.echo("Requesting image pull...")
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=False,
raise_error=True
)
except Exception as err: # pylint: disable=broad-except
click.echo('An exception is catched while requesting pull image!')
click.echo(str(err))
return 1
async for data in json_streamer(resp.content, '$.status[::]'):
resp_err = data.get('error')
if resp_err:
click.echo('An error occured while pulling the image. {}'.format(resp_err))
return 1
if data.get('started'):
click.echo("Image pulling process started.")
if not wait:
return
if data.get('finished'):
click.echo("Image pulling process finished.")
return
click.echo('An error occured while pulling image!')
return 1
@image.command('pull')
@click.option('--from', 'node', default=None,
help='Pull from spesific node')
@click.option('--wait', 'wait', default=False, is_flag=True,
help='Waits result of pulling image')
help='Wait the result of pulling image')
@click.option('--force', 'force', default=False, is_flag=True,
help='Forces download of image even if the node is not recognised')
@click.option('--noprogress', 'noprogress', default=False, is_flag=True,
@click.option('--no-progress', 'noprogress', default=False, is_flag=True,
help='Disable image transfer progress display')
@click.option('--whole-image-only', 'whole_image_only', default=False, is_flag=True,
help='Pull an image from other node (not each layer)')
......@@ -73,86 +188,18 @@ def image_pull(ctx, node: str, wait: bool, force: bool, noprogress: bool,
"""Pull a container image from cluster or repository"""
click.echo(
'Pulling image %s from %s!' % (imagename, node or "available nodes"))
loop = asyncio.get_event_loop()
if not noprogress:
if whole_image_only:
progbar = MultipleProgressBar(desc=imagename)
async def _pull_with_progress():
"""Pull image with async client"""
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
async for update in json_streamer(resp.content, '$.progress[::]'):
progbar.update(update['progress'])
progbar.finish()
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_with_progress())
click.echo('done!')
return_value = loop.run_until_complete(_pull_with_progress(ctx, imagename, node, force))
else:
async def _pull_with_progress():
"""Pull image with async client"""
progbars = {}
resp = await ctx.async_beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=True,
raise_error=True
)
click.echo('Downloading layers...')
lastbar = None
async for data in json_streamer(resp.content, '$.progress[::]'):
digest = data['digest']
if digest == 'done':
lastbar.seek_last_line()
click.echo('Loading image...')
else:
if digest not in progbars:
if data['status'] == DockerUtil.DL_ALREADY:
progbars[digest] = {
'bar': MultipleProgressBar(
widgets=[digest + ' Already exists']
)
}
else:
progbars[digest] = {
'bar': MultipleProgressBar(desc=digest)
}
progbars[digest]['bar'].update_and_seek(data['progress'])
lastbar = progbars[digest]['bar']
click.echo('done!')
loop = asyncio.get_event_loop()
loop.run_until_complete(_pull_with_progress())
return_value = loop.run_until_complete(
_pull_with_progress_distributed(ctx, imagename, node, force))
else:
result = ctx.beiran_client.pull_image(
imagename,
node=node,
wait=wait,
force=force,
whole_image_only=whole_image_only,
progress=False,
raise_error=True
)
if "started" in result:
click.echo("Process is started")
if "finished" in result:
click.echo("Process is finished")
return_value = loop.run_until_complete(
_pull_without_progress(ctx, imagename, node, wait, force, whole_image_only))
sys.exit(return_value)
# pylint: enable-msg=too-many-arguments
......
......@@ -29,11 +29,6 @@ from aiodocker import Docker
from aiodocker.exceptions import DockerError
from peewee import SQL
from beiran.config import config
from beiran.plugin import BasePackagePlugin, History
from beiran.models import Node
from beiran.daemon.peer import Peer
from beiran_package_docker.image_ref import del_idpref
from beiran_package_docker.models import DockerImage, DockerLayer
from beiran_package_docker.models import MODEL_LIST
......@@ -41,6 +36,10 @@ from beiran_package_docker.util import DockerUtil
from beiran_package_docker.api import ROUTES
from beiran_package_docker.api import Services as ApiDependencies
from beiran.config import config
from beiran.plugin import BasePackagePlugin, History
from beiran.models import Node
from beiran.daemon.peer import Peer
PLUGIN_NAME = 'docker'
PLUGIN_TYPE = 'package'
......
......@@ -298,7 +298,7 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
async def get_digest_by_diffid(self, diffid: str)-> Optional[str]:
"""Return digest of a layer by diff id."""
try:
with open(os.path.join(self.v2metadata_path, del_idpref(diffid)))as file:
with open(os.path.join(self.v2metadata_path, del_idpref(diffid))) as file:
content = json.load(file)
return content[0]['Digest']
except FileNotFoundError:
......@@ -601,23 +601,33 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD')
if resp.status == 401 or resp.status == 200:
if resp.status == 401:
requirements = await self.get_auth_requirements(resp.headers, **kwargs)
if resp.status == 401:
requirements = await self.get_auth_requirements(resp.headers, **kwargs)
# HEAD request for get size
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD', Authorization=requirements)
layer_size = int(resp.headers.get('content-length'))
if resp.status >= 400:
raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. code: %d"%(url, resp.status))
if not resp.content:
raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. Response does not have a body."%url)
if not resp.headers.get('content-length'):
raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. Response headers do not have content-length."
%url)
self.queues[jobid][digest]['size'] = layer_size
self.queues[jobid][digest]['status'] = self.DL_GZ_DOWNLOADING
layer_size = int(resp.headers.get('content-length'))
resp = await async_write_file_stream(url, save_path, timeout=self.TIMEOUT_DL_LAYER + \
self.get_additional_time_downlaod(layer_size),
retry=self.RETRY,
queue=self.queues[jobid][digest]['queue'],
Authorization=requirements)
self.queues[jobid][digest]['size'] = layer_size
self.queues[jobid][digest]['status'] = self.DL_GZ_DOWNLOADING
resp = await async_write_file_stream(url, save_path, timeout=self.TIMEOUT_DL_LAYER + \
self.get_additional_time_downlaod(layer_size),
retry=self.RETRY,
queue=self.queues[jobid][digest]['queue'],
Authorization=requirements)
if resp.status != 200:
raise DockerUtil.LayerDownloadFailed("Failed to download layer. code: %d" % resp.status)
......@@ -640,6 +650,17 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# HEAD request to get size
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD')
if resp.status >= 400:
raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. code: %d"%(url, resp.status))
if not resp.content:
raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. Response does not have a body."%url)
if not resp.headers.get('content-length'):
raise DockerUtil.LayerDownloadFailed(
"Failed to download layer from %s. "%url +
"Response headers do not have content-length.")
layer_size = int(resp.headers.get('content-length'))
self.queues[jobid][digest]['size'] = layer_size
......@@ -649,6 +670,8 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
self.get_additional_time_downlaod(layer_size),
retry=self.RETRY,
queue=self.queues[jobid][digest]['queue'])
if resp.status != 200:
raise DockerUtil.LayerDownloadFailed("Failed to download layer. code: %d" % resp.status)
self.logger.debug("downloaded layer %s to %s", digest, save_path)
self.queues[jobid][digest]['status'] = self.DL_FINISH
......@@ -936,17 +959,19 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# get manifest
manifest = await self.fetch_docker_image_manifest(ref['domain'], ref['repo'],
manifest_digest)
schema_v = manifest['schemaVersion']
try:
manifest = await self.fetch_docker_image_manifest(ref['domain'], ref['repo'],
manifest_digest)
except DockerUtil.FetchManifestFailed as err:
raise DockerUtil.ManifestError(str(err))
if schema_v == 1:
if manifest['schemaVersion'] == 1:
# pull layers and create config from version 1 manifest
config_json_str, config_digest, _ = await self.fetch_config_schema_v1(
ref, manifest, jobid
)
elif schema_v == 2:
elif manifest['schemaVersion'] == 2:
if manifest['mediaType'] == 'application/vnd.docker.distribution.manifest.v2+json':
# pull layers using version 2 manifest
config_json_str, config_digest, _ = await self.fetch_config_schema_v2(
......@@ -955,7 +980,7 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
else:
raise DockerUtil.ManifestError('Invalid media type: %d' % manifest['mediaType'])
else:
raise DockerUtil.ManifestError('Invalid schema version: %d' % schema_v)
raise DockerUtil.ManifestError('Invalid schema version: %d' % manifest['schemaVersion'])
manifestlist_str = json.dumps(manifestlist, indent=3)
repo_digest = add_idpref(hashlib.sha256(manifestlist_str.encode('utf-8')).hexdigest())
......@@ -1054,8 +1079,11 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
ref = normalize_ref(tag, index=True)
# get manifest
manifest = await self.fetch_docker_image_manifest(
ref['domain'], ref['repo'], ref['suffix'])
try:
manifest = await self.fetch_docker_image_manifest(
ref['domain'], ref['repo'], ref['suffix'])
except DockerUtil.FetchManifestFailed as err:
raise DockerUtil.ManifestError(str(err))
schema_v = manifest['schemaVersion']
......
#!/usr/bin/env bats
@test "pull alpine from other node (with '--noprogress' option)" {
run python -m beiran docker image pull alpine --noprogress
@test "pulling an image from another node (with '--no-progress' option) exits peacefully" {
run python -m beiran docker image pull alpine --no-progress
[ "$status" -eq 0 ]
}
@test "pulling a non-existing image (with '--no-progress' option) exits with error code" {
run python -m beiran docker image pull a_non_existent_image --no-progress
[ "$status" -eq 1 ]
}
#!/usr/bin/env bats
@test "pull alpine from other node (with '--wait' option)" {
run python -m beiran docker image pull alpine --wait
@test "pulling an image from another node (with '--wait' and '--no-progress' options) exits peacefully" {
run python -m beiran docker image pull alpine --wait --no-progress
[ "$status" -eq 0 ]
}
@test "pulling a non-existing image (with '--wait' and '--no-progress' options) exits with error code" {
run python -m beiran docker image pull a_non_existent_image --wait --no-progress
[ "$status" -eq 1 ]
}
#!/usr/bin/env bats
@test "pull alpine from other node" {
@test "pulling an image from another node exits peacefully" {
run python -m beiran docker image pull alpine
[ "$status" -eq 0 ]
}
@test "pulling a non-existing image exits with error code" {
run python -m beiran docker image pull a_non_existent_image
[ "$status" -eq 1 ]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment