Handle business exceptions (for example, invalid data)
One more scenario to handle: what to do with invalid data? We should at least log a message in case we have some data that doesn't pass our validate_traffic_data() check.
@task
def consume_traffic_data():
"""
Inhuman Insurance, Inc. Artificial Intelligence System robot.
Consumes traffic data work items.
"""
process_traffic_data()
def process_traffic_data():
for item in workitems.inputs:
traffic_data = item.payload["traffic_data"]
if len(traffic_data["country"]) == 3:
status, return_json = post_traffic_data_to_sales_system(traffic_data)
if status == 200:
item.done()
else:
item.fail(
exception_type="APPLICATION",
code="TRAFFIC_DATA_POST_FAILED",
message=return_json["message"],
)
else:
item.fail(
exception_type="BUSINESS",
code="INVALID_TRAFFIC_DATA",
message=item.payload,
)
def post_traffic_data_to_sales_system(traffic_data):
url = "https://robocorp.com/inhuman-insurance-inc/sales-system-api"
response = requests.post(url, json=traffic_data)
return response.status_code, response.json()
- You added an extra else, to the check regarding the validity of the data, moved the status related logic under the previous if to take care of dealing with invalid traffic data.
It logs a message, including the invalid data.
The work item is released as
FAILED
. This time the exception type isBUSINESS
. The code can be any custom code. You choose to useINVALID_TRAFFIC_DATA
. The exception type and the code help filter the failed work items in Control Room.
After running the consumer task, you see the following log entry regarding the invalid data (due to the country code not matching the validation rule):
Releasing item '1' with FAILED state and exception: {'type': 'BUSINESS', 'code': 'INVALID_TRAFFIC_DATA', 'message': {'traffic_data': {'country': 'SWError', 'year': 2019, 'rate': 3.13947}}}
You notice that the entire logic is in process_traffic_data()
and in our @task
function we only call this one function.
You know what you have to do. That's right, some more refactoring!
Your consumer logic is now complete!
The consumer:
- loops all the work items one by one.
- validates the data.
- posts the data to the sales system API.
- handles successful responses.
- handles application exceptions.
- handles business exceptions.
Your complete code now looks like:
import requests
from robocorp import workitems
from robocorp.tasks import task
from RPA.HTTP import HTTP
from RPA.JSON import JSON
from RPA.Tables import Tables
http = HTTP()
json = JSON()
table = Tables()
TRAFFIC_JSON_FILE_PATH = "output/traffic.json"
# JSON data keys
COUNTRY_KEY = "SpatialDim"
YEAR_KEY = "TimeDim"
RATE_KEY = "NumericValue"
GENDER_KEY = "Dim1"
@task
def produce_traffic_data():
"""
Inhuman Insurance, Inc. Artificial Intelligence System automation.
Produces traffic data work items.
"""
http.download(
url="https://github.com/robocorp/inhuman-insurance-inc/raw/main/RS_198.json",
target_file=TRAFFIC_JSON_FILE_PATH,
overwrite=True,
)
traffic_data = load_traffic_data_as_table()
filtered_data = filter_and_sort_traffic_data(traffic_data)
filtered_data = get_latest_data_by_country(filtered_data)
payloads = create_work_item_payloads(filtered_data)
save_work_item_payloads(payloads)
@task
def consume_traffic_data():
"""
Inhuman Insurance, Inc. Artificial Intelligence System automation.
Consumes traffic data work items.
"""
for item in workitems.inputs:
traffic_data = item.payload["traffic_data"]
if len(traffic_data["country"]) == 3:
status, return_json = post_traffic_data_to_sales_system(traffic_data)
if status == 200:
item.done()
else:
item.fail(
exception_type="APPLICATION",
code="TRAFFIC_DATA_POST_FAILED",
message=return_json["message"],
)
else:
item.fail(
exception_type="BUSINESS",
code="INVALID_TRAFFIC_DATA",
message=item.payload,
)
def post_traffic_data_to_sales_system(traffic_data):
url = "https://robocorp.com/inhuman-insurance-inc/sales-system-api"
response = requests.post(url, json=traffic_data)
return response.status_code, response.json()
def load_traffic_data_as_table():
json_data = json.load_json_from_file(TRAFFIC_JSON_FILE_PATH)
return table.create_table(json_data["value"])
def filter_and_sort_traffic_data(data):
max_rate = 5.0
both_genders = "BTSX"
table.filter_table_by_column(data, RATE_KEY, "<", max_rate)
table.filter_table_by_column(data, GENDER_KEY, "==", both_genders)
table.sort_table_by_column(data, YEAR_KEY, False)
return data
def get_latest_data_by_country(data):
data = table.group_table_by_column(data, COUNTRY_KEY)
latest_data_by_country = []
for group in data:
first_row = table.pop_table_row(group)
latest_data_by_country.append(first_row)
return latest_data_by_country
def create_work_item_payloads(traffic_data):
payloads = []
for row in traffic_data:
payload = dict(
country=row[COUNTRY_KEY],
year=row[YEAR_KEY],
rate=row[RATE_KEY],
)
payloads.append(payload)
return payloads
def save_work_item_payloads(payloads):
for payload in payloads:
variables = dict(traffic_data=payload)
workitems.outputs.create(variables)
All done. The producer produces, and the consumer consumes. Bravissimo! 👏