Commit dde33a05 authored by Furkan Mustafa's avatar Furkan Mustafa

Merge branch 'a-little-enhancement-docker' into 'dev-0.0.9'

a little enhancement for docker

See merge request !31
parents 77d04b26 2064e794
......@@ -21,14 +21,16 @@
"""Beiran Library"""
from typing import Tuple
import asyncio
import aiohttp
import async_timeout
from beiran.util import input_reader
async def async_req(url: str, return_json: bool = True,
timeout: int = 3, method: str = "GET",
async def async_req(url: str, return_json: bool = True, # pylint: disable=too-many-arguments
timeout: int = 3, retry: int = 1,
retry_interval: int = 2, method: str = "GET",
**kwargs) -> Tuple[aiohttp.client_reqrep.ClientResponse, dict]:
"""
Async http get with aiohttp
......@@ -47,18 +49,23 @@ async def async_req(url: str, return_json: bool = True,
data = kwargs.pop('data', None)
headers = kwargs
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(timeout):
async with session.request(method, url, json=json,
data=data, headers=headers) as resp:
if return_json:
data = await resp.json(content_type=None)
return resp, data
return resp, {}
for _ in range(retry):
try:
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(timeout):
async with session.request(method, url, json=json,
data=data, headers=headers) as resp:
if return_json:
data = await resp.json(content_type=None)
return resp, data
return resp, {}
except asyncio.TimeoutError:
await asyncio.sleep(retry_interval)
raise asyncio.TimeoutError
async def async_write_file_stream(url: str, save_path: str, queue=None,
timeout: int = 3, method: str = "GET",
async def async_write_file_stream(url: str, save_path: str, queue=None, # pylint: disable=too-many-arguments,too-many-locals
timeout: int = 3, retry: int = 1,
retry_interval: int = 2, method: str = "GET",
**kwargs) -> aiohttp.client_reqrep.ClientResponse:
"""
Async write a stream to a file
......@@ -76,16 +83,21 @@ async def async_write_file_stream(url: str, save_path: str, queue=None,
data = kwargs.pop('data', None)
headers = kwargs
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(timeout):
async with session.request(method, url, json=json,
data=data, headers=headers) as resp:
for _ in range(retry):
try:
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(timeout):
async with session.request(method, url, json=json,
data=data, headers=headers) as resp:
with open(save_path, 'wb')as file:
async for chunk in input_reader(resp.content):
file.write(chunk)
with open(save_path, 'wb')as file:
async for chunk in input_reader(resp.content):
file.write(chunk)
if queue:
queue.put_nowait(chunk)
if queue:
queue.put_nowait(chunk)
if queue:
queue.put_nowait(None)
return resp
queue.put_nowait(None)
return resp
except asyncio.TimeoutError:
await asyncio.sleep(retry_interval)
raise asyncio.TimeoutError
......@@ -205,15 +205,26 @@ def layer():
@layer.command('list')
@click.option('--all', 'all_nodes', default=False, is_flag=True,
help='List layers from all known nodes')
@click.option('--diffid', default=False, is_flag=True,
help="Show layer's diffid")
@click.option('--node', default=None,
help='List layers from specific node')
@click.pass_obj
@pass_context
def layer_list(ctx, all_nodes: bool, node: str):
def layer_list(ctx, all_nodes: bool, diffid: bool, node: str):
"""List container layers across the cluster"""
layers = ctx.beiran_client.get_layers(all_nodes=all_nodes, node_uuid=node)
table = [
[i['digest'], sizeof_fmt(i['size']), str(len(i['available_at'])) + ' node(s)']
for i in layers
]
print(tabulate(table, headers=["Digest", "Size", "Availability"]))
if diffid:
table = [
[i['digest'], i['diff_id'], sizeof_fmt(i['size']), str(len(i['available_at'])) +
' node(s)']
for i in layers
]
headers = ["Digest", "DiffID", "Size", "Availability"]
else:
table = [
[i['digest'], sizeof_fmt(i['size']), str(len(i['available_at'])) + ' node(s)']
for i in layers
]
headers = ["Digest", "Size", "Availability"]
print(tabulate(table, headers=headers))
......@@ -110,6 +110,13 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# event datas for downloading layers
EVENT_START_LAYER_DOWNLOAD = "start_layer_download"
# consts related with timeout
TIMEOUT = 10 # second
TIMEOUT_DL_MANIFEST = 10
TIMEOUT_DL_CONFIG = 10
TIMEOUT_DL_LAYER = 30
RETRY = 2
def __init__(self, cache_dir: str, storage: str, # pylint: disable=too-many-arguments
aiodocker: Docker = None, logger: logging.Logger = None,
local_node: Node = None) -> None:
......@@ -163,6 +170,11 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
if not os.path.isdir(self.layer_cache_path): # cache path for layer
os.makedirs(self.layer_cache_path)
@staticmethod
def get_additional_time_downlaod(size: int) -> int:
"""Get additional time to downlload something"""
return size // 5000000
def docker_find_layer_dir_by_digest(self, layer_digest: str):
"""
try to find local layer directory containing tar archive
......@@ -471,13 +483,15 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# try to access the server with HEAD requests
# there is a purpose to check the type of authentication
resp, _ = await async_req(url=url, return_json=False, timeout=10, method='HEAD')
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD')
if resp.status == 401 or resp.status == 200:
if resp.status == 401:
requirements = await self.get_auth_requirements(resp.headers, **kwargs)
resp, manifest = await async_req(url=url, return_json=True, Authorization=requirements,
timeout=self.TIMEOUT_DL_MANIFEST, retry=self.RETRY,
Accept=schema_v2_header)
if resp.status != 200:
......@@ -491,7 +505,8 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
Get Bearer token from auth.docker.io
"""
_, data = await async_req(
"{}?service={}&scope={}".format(realm, service, scope)
"{}?service={}&scope={}".format(realm, service, scope),
timeout=self.TIMEOUT, retry=self.RETRY,
)
token = data['token']
return token
......@@ -548,21 +563,24 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# try to access the server with HEAD requests
# there is a purpose to check the type of authentication
resp, _ = await async_req(url=url, return_json=False, method='HEAD')
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD')
if resp.status == 401 or resp.status == 200:
if resp.status == 401:
requirements = await self.get_auth_requirements(resp.headers, **kwargs)
# HEAD request for get size
resp, _ = await async_req(url=url, return_json=False, method='HEAD',
Authorization=requirements)
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD', Authorization=requirements)
layer_size = int(resp.headers.get('content-length'))
self.queues[jobid][layer_digest]['size'] = layer_size
self.queues[jobid][layer_digest]['status'] = self.DL_GZ_DOWNLOADING
resp = await async_write_file_stream(url, save_path, timeout=60,
resp = await async_write_file_stream(url, save_path, timeout=self.TIMEOUT_DL_LAYER + \
self.get_additional_time_downlaod(layer_size),
retry=self.RETRY,
queue=self.queues[jobid][layer_digest]['queue'],
Authorization=requirements)
......@@ -583,13 +601,16 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
self.logger.debug("downloading layer from %s", url)
# HEAD request to get size
resp, _ = await async_req(url=url, return_json=False, method='HEAD')
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD')
layer_size = int(resp.headers.get('content-length'))
self.queues[jobid][digest]['size'] = layer_size
self.queues[jobid][digest]['status'] = self.DL_TAR_DOWNLOADING
resp = await async_write_file_stream(url, save_path, timeout=60,
resp = await async_write_file_stream(url, save_path, timeout=self.TIMEOUT_DL_LAYER + \
self.get_additional_time_downlaod(layer_size),
retry=self.RETRY,
queue=self.queues[jobid][digest]['queue'])
self.logger.debug("downloaded layer %s to %s", digest, save_path)
self.queues[jobid][digest]['status'] = self.DL_FINISH
......@@ -686,13 +707,15 @@ class DockerUtil: # pylint: disable=too-many-instance-attributes
# try to access the server with HEAD requests
# there is a purpose to check the type of authentication
resp, _ = await async_req(url=url, return_json=False, timeout=10, method='HEAD')
resp, _ = await async_req(url=url, return_json=False, timeout=self.TIMEOUT,
retry=self.RETRY, method='HEAD')
if resp.status == 401 or resp.status == 200:
if resp.status == 401:
requirements = await self.get_auth_requirements(resp.headers, **kwargs)
resp, _ = await async_req(url=url, Authorization=requirements)
resp, _ = await async_req(url=url, timeout=self.TIMEOUT_DL_CONFIG,
retry=self.RETRY, Authorization=requirements)
if resp.status != 200:
raise DockerUtil.ConfigDownloadFailed("Failed to download config. code: %d"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment