Skip to content

Commit c9df8df

Browse files
authored
feat: Add get_image_scanning_results method to SdScanningClient (#171)
This method will allow to retrieve the scanning results for all the policies, or just the specified one.
1 parent 5e98b31 commit c9df8df

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed

sdcclient/_scanning.py

+80
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import json
33
import re
44
import time
5+
from datetime import datetime
56
from warnings import warn
67

8+
from requests.exceptions import RetryError
79
from requests_toolbelt.multipart.encoder import MultipartEncoder
810

911
try:
@@ -1224,3 +1226,81 @@ def download_cve_report_csv(self, vuln_type="os", scope_type="static"):
12241226
return [False, self.lasterr]
12251227

12261228
return [True, res.content.decode("utf-8")]
1229+
1230+
def get_image_scanning_results(self, image_name, policy_id=None):
1231+
'''
1232+
Args:
1233+
image_name (str): Image name to retrieve the scanning results from
1234+
policy_id (str): Policy ID to check against. If not specified, will check against all policies.
1235+
1236+
Returns:
1237+
A tuple of (bool, str).
1238+
The first parameter, if true, means that the result is correct, while
1239+
if false, means that there's been an error. The second parameter
1240+
will hold the response of the API call.
1241+
'''
1242+
try:
1243+
ok, res = self.get_image(image_name)
1244+
if not ok:
1245+
return ok, res
1246+
1247+
image_digest = res[0]["imageDigest"]
1248+
image_tag = res[0]["image_detail"][0]["fulltag"]
1249+
except RetryError:
1250+
return [False, "could not retrieve image digest for the given image name, "
1251+
"ensure that the image has been scanned"]
1252+
1253+
url = f"{self.url}/api/scanning/v1/images/{image_digest}/policyEvaluation"
1254+
params = {
1255+
"tag": image_tag,
1256+
}
1257+
1258+
res = self.http.get(url, headers=self.hdrs, params=params, verify=self.ssl_verify)
1259+
if not self._checkResponse(res):
1260+
return [False, self.lasterr]
1261+
1262+
json_res = res.json()
1263+
1264+
result = {
1265+
"image_digest": json_res["imageDigest"],
1266+
"image_id": json_res["imageId"],
1267+
"status": json_res["status"],
1268+
"image_tag": image_tag,
1269+
"total_stop": json_res["nStop"],
1270+
"total_warn": json_res["nWarn"],
1271+
"last_evaluation": datetime.utcfromtimestamp(json_res["at"]),
1272+
"policy_id": "*",
1273+
"policy_name": "All policies",
1274+
"warn_results": [],
1275+
"stop_results": []
1276+
}
1277+
1278+
if policy_id:
1279+
policy_results = [result for result in json_res["results"] if result["policyId"] == policy_id]
1280+
if policy_results:
1281+
filtered_result_by_policy_id = policy_results[0]
1282+
result["total_stop"] = filtered_result_by_policy_id["nStop"]
1283+
result["total_warn"] = filtered_result_by_policy_id["nWarn"]
1284+
result["warn_results"] = [rule_result["checkOutput"]
1285+
for gate_result in filtered_result_by_policy_id["gateResults"]
1286+
for rule_result in gate_result["ruleResults"]
1287+
if rule_result["gateAction"] == "warn"]
1288+
result["stop_results"] = [rule_result["checkOutput"]
1289+
for gate_result in filtered_result_by_policy_id["gateResults"]
1290+
for rule_result in gate_result["ruleResults"]
1291+
if rule_result["gateAction"] == "stop"]
1292+
else:
1293+
return [False, "the specified policy ID doesn't exist"]
1294+
else:
1295+
result["warn_results"] = [rule_result["checkOutput"]
1296+
for result in json_res["results"]
1297+
for gate_result in result["gateResults"]
1298+
for rule_result in gate_result["ruleResults"]
1299+
if rule_result["gateAction"] == "warn"]
1300+
result["stop_results"] = [rule_result["checkOutput"]
1301+
for result in json_res["results"]
1302+
for gate_result in result["gateResults"]
1303+
for rule_result in gate_result["ruleResults"]
1304+
if rule_result["gateAction"] == "stop"]
1305+
1306+
return [True, result]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import os
2+
from datetime import datetime
3+
4+
from expects import equal, expect, be_empty, be_above_or_equal, be_an, have_keys, not_
5+
from mamba import before, context, description, it
6+
7+
from sdcclient import SdScanningClient
8+
from specs import be_successful_api_call
9+
10+
with description("Policy Evaluation", "integration") as self:
11+
with before.all:
12+
self.client = SdScanningClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
13+
token=os.getenv("SDC_SECURE_TOKEN"))
14+
15+
with it("is able to retrieve the results for all the policies"):
16+
image_name = "alpine:latest"
17+
ok, res = self.client.get_image_scanning_results(image_name)
18+
19+
expect((ok, res)).to(be_successful_api_call)
20+
expect(res).to(
21+
have_keys("image_digest", "image_id", "stop_results",
22+
total_warn=be_above_or_equal(0), total_stop=be_above_or_equal(0),
23+
last_evaluation=be_an(datetime),
24+
status="pass", image_tag="docker.io/alpine:latest",
25+
policy_id="*", policy_name="All policies",
26+
warn_results=not_(be_empty))
27+
)
28+
29+
with it("is able to retrieve the results for the default policy"):
30+
image_name = "alpine:latest"
31+
policy_id = "default"
32+
ok, res = self.client.get_image_scanning_results(image_name, policy_id)
33+
34+
expect((ok, res)).to(be_successful_api_call)
35+
expect(res).to(
36+
have_keys("image_digest", "image_id", "stop_results",
37+
total_warn=be_above_or_equal(0), total_stop=be_above_or_equal(0),
38+
last_evaluation=be_an(datetime),
39+
status="pass", image_tag="docker.io/alpine:latest",
40+
policy_id="*", policy_name="All policies",
41+
warn_results=not_(be_empty))
42+
)
43+
44+
with context("but the image has not been scanned yet"):
45+
with it("returns an error saying that the image has not been found"):
46+
image_name = "unknown_image"
47+
ok, res = self.client.get_image_scanning_results(image_name)
48+
49+
expect((ok, res)).to_not(be_successful_api_call)
50+
expect(res).to(equal("could not retrieve image digest for the given image name, "
51+
"ensure that the image has been scanned"))
52+
53+
with context("but the provided policy id does not exist"):
54+
with it("returns an error saying that the policy id is not found"):
55+
image_name = "alpine"
56+
policy_id = "unknown_policy_id"
57+
ok, res = self.client.get_image_scanning_results(image_name, policy_id)
58+
59+
expect((ok, res)).to_not(be_successful_api_call)
60+
expect(res).to(equal("the specified policy ID doesn't exist"))

0 commit comments

Comments
 (0)