docker based ci tool
Revisão | b459f117df9e712ec738f76339191854a7ff9a63 (tree) |
---|---|
Hora | 2019-05-31 03:41:46 |
Autor | hylom <hylom@user...> |
Commiter | hylom |
core-server: use separate thread to run and watch container
@@ -4,6 +4,9 @@ import os | ||
4 | 4 | import re |
5 | 5 | import pathlib |
6 | 6 | import time |
7 | +import concurrent.futures | |
8 | +import json | |
9 | + | |
7 | 10 | import grpc |
8 | 11 | from concurrent import futures |
9 | 12 |
@@ -19,18 +22,63 @@ _ONE_DAY_IN_SECONDS = 60 * 60 * 24 | ||
19 | 22 | |
20 | 23 | OUTPUT_DIR = './results' |
21 | 24 | |
25 | +executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) | |
26 | + | |
27 | +def run_task(task_name, task): | |
28 | + print("run task: '{}'".format(task_name)) | |
29 | + | |
30 | + status = { | |
31 | + "succeeded": False, | |
32 | + "container_status": "Running" | |
33 | + }; | |
34 | + reporter = FileReporter(task_name, status) | |
35 | + reporter.write_status() | |
36 | + | |
37 | + image = task["image"] | |
38 | + client = docker.from_env() | |
39 | + container = client.containers.run(image=image, detach=True) | |
40 | + container.wait(timeout=task.get("timeout", 60)) | |
41 | + | |
42 | + print("task done.") | |
43 | + | |
44 | + lines = [] | |
45 | + for line in container.logs(stream=True): | |
46 | + lines.append(line) | |
47 | + logs = b"".join(lines) | |
48 | + | |
49 | + if task.get("auto_remove", True): | |
50 | + container.remove() | |
51 | + print("container removed.") | |
52 | + | |
53 | + reporter.set_log(logs) | |
54 | + status["container_status"] = "Done" | |
55 | + reporter.write_status() | |
56 | + reporter.write_log() | |
57 | + | |
58 | + print("done.") | |
59 | + return | |
60 | + | |
61 | + | |
22 | 62 | class FileReporter(): |
23 | - def __init__(self, task_name, report, logs): | |
63 | + def __init__(self, task_name, status, log=None): | |
24 | 64 | self.task_name = task_name |
25 | - self.report = report | |
26 | - self.logs = logs | |
65 | + self.status = status | |
66 | + self.log = log | |
27 | 67 | self.output_basedir = os.path.join(OUTPUT_DIR, task_name) |
68 | + self.output_dir = self._get_output_directory() | |
69 | + | |
70 | + def set_log(self, log): | |
71 | + self.log = log | |
28 | 72 | |
29 | - def write(self): | |
30 | - output_dir = self._get_output_directory() | |
31 | - log_pathname = output_dir / "logs.txt" | |
32 | - with log_pathname.open(mode='wb') as f: | |
33 | - f.write(self.logs) | |
73 | + def write_log(self): | |
74 | + pn = self.output_dir / "log.txt" | |
75 | + with pn.open(mode='wb') as f: | |
76 | + f.write(self.log) | |
77 | + | |
78 | + def write_status(self): | |
79 | + pn = self.output_dir / "status.json" | |
80 | + with pn.open(mode='w') as f: | |
81 | + json.dump(self.status, f) | |
34 | 82 | |
35 | 83 | def _get_output_directory(self): |
36 | 84 | if not os.path.exists(self.output_basedir): |
@@ -57,12 +105,10 @@ class DockRun(DockRunServicer): | ||
57 | 105 | message = "task '{}' not found.".format(req.task_name) |
58 | 106 | return RunTaskReply(is_succeed=False, message=message) |
59 | 107 | |
60 | - logs = self._run_task(task) | |
61 | - report = { | |
62 | - "succeed": True, | |
63 | - }; | |
64 | - reporter = FileReporter(req.task_name, report, logs) | |
65 | - reporter.write() | |
108 | + # generate process | |
109 | + #self._run_task(task) | |
110 | + executor.submit(run_task, req.task_name, task) | |
111 | + | |
66 | 112 | return RunTaskReply(is_succeed=True, message="") |
67 | 113 | |
68 | 114 | def _run_task(self, task): |
@@ -79,7 +125,15 @@ class DockRun(DockRunServicer): | ||
79 | 125 | if task.get("auto_remove", True): |
80 | 126 | container.remove() |
81 | 127 | |
82 | - return logs | |
128 | + report = { | |
129 | + "succeeded": False, | |
130 | + "status": None, | |
131 | + }; | |
132 | + | |
133 | + reporter = FileReporter(req.task_name, report, logs) | |
134 | + reporter.write() | |
135 | + | |
136 | + return | |
83 | 137 | |
84 | 138 | def main(): |
85 | 139 | server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) |