Examples of Python Functions
Due to the complexity of the real world, we cannot offer to you examples of everything, but, here are some examples of how to use Python functions in real-case scenarios.
Want to contribute?
Send us an email at software@layrz.com with your example and we will add it to the list.
GET_SENSOR LCL Formula equivalence
Let's start simple, the GET_SENSOR
function in LCL is useful for getting the value of a sensor, but, how can we do the same thing in Python?
This example will be divided in two steps, one creating the direct equivalent of GET_SENSOR
, and the other one creating a more complex function that uses the GET_PARAM
, CONCAT
and PRIMARY_DEVICE
function.
First example - GET_SENSOR
equivalence
def calculate(message, previous_message, asset):
# We are assuming that you know the parameter name/slug, for this example, we will use "tank.temperature"
return message.sensors.get("tank.temperature")
# And that's it, it's that simple right?
Second example - GET_PARAM
, CONCAT
and PRIMARY_DEVICE
equivalence
def calculate(message, previous_message, asset):
# First of all, we need to get the primary device of the asset
# To get that, we need to iterate the devices of the asset and check if the device is the primary one
primary_device = None
for device in asset.devices:
if device.is_primary:
primary_device = device
break
# Because some assets don't have a primary device, we need to check if the primary device is None
if primary_device is None:
# If the primary device is None, we cannot get the parameter, so we return None
return None
# Then, we need to get the parameter value
for key, value in message.payload.items():
# We are assuming that you know the parameter name/slug, for this example, we will use "tank.temperature"
# So, if the combination of the primary device ident and the parameter name is equal to the key, we return the value
if key == f'{primary_device.ident}.tank.temperature':
return value
# Otherwise, we return None
return None
Human readable time
In this example, the idea is simple, using the messsage.received_at
, convert it to a human-readable format using .strftime()
method. Also, using .astimezone()
method to convert the time to the user's timezone, in this case, is America/Panama
.
def calculate(message, previous_message, asset):
# First of all, we need to define the user's timezone
tz = zoneinfo.ZoneInfo("America/Panama")
# Then, we need to convert the received time to the user's timezone
received_at = message.received_at.astimezone(tz)
# Finally, we need to convert the time to a human-readable format
return received_at.strftime("%Y-%m-%d %H:%M:%S")
Theorical acceleration
Using the simple formula of acceleration (a = dv / dt
), we can calculate the acceleration of an asset.
def calculate(message, previous_message, asset):
# First of all, we need to check if the asset has a previous message
if previous_message is None:
# Due to the fact that we don't have a previous message, we cannot calculate the acceleration, so we return 0
return 0
# Then, defines the two speeds
v0 = previous_message.position.speed
v1 = message.position.speed
# Due to the None value of the speed, we cannot calculate the acceleration, so we return 0
if v0 is None or v1 is None:
return 0
# Now, using the previous speed, we calculate the delta of speed
dv = v1 - v0
# We do the same thing but with the time
dt = (message.received_at - previous_message.received_at).total_seconds()
# Finally, we calculate the acceleration
return dv / dt
Slope beteween two points
Using the latitude
, longitude
and altitude
of the previous and current message, we can calculate the slope between two points.
def calculate(message, previous_message, asset):
# First of all, we need to check if the asset has a previous message
if previous_message is None:
# Due to the fact that we don't have a previous message, we cannot calculate the slope, so we return 0
return 0
# Then, defines the two points
lat0 = previous_message.position.latitude
lon0 = previous_message.position.longitude
alt0 = previous_message.position.altitude
lat1 = message.position.latitude
lon1 = message.position.longitude
alt1 = message.position.altitude
# Due to the possibility to None of the values, we cannot calculate the slope, so we return 0
if lat0 is None or lon0 is None or alt0 is None or lat1 is None or lon1 is None or alt1 is None:
return 0
# Now, define all of the deltas
dx = lat1 - lat0
dy = lon1 - lon0
dz = alt1 - alt0
# Now, calculate the distance between the two points
distance = math.sqrt(dx ** 2 + dy ** 2)
# Now, calculate the slope using the distance and the altitude delta
slope = dz / distance
# Finally, return the slope in degrees
return math.degrees(math.atan(slope))
Engine hours
Let's think about maintenance, all of the engines requires maintenance after some hours of use, so, how can we calculate the engine hours? With Layrz everything is simple!
This examples dives into the accumulator-like sensors, uses the previous value to increment the engine hours.
def calculate(message, previous_message, asset):
# First of all, we need to check if the asset has a previous message
if previous_message is None:
# Due to the fact that we don't have a previous message, we cannot calculate the engine hours, so we return 0
return 0
# Now, we need to get the previous value of the accumulator-like sensor
previous_value = previous_message.sensors.get("engine.hours", 0)
# Then, we need to know if the engine is on
engine_on = message.sensors.get("engine.ignition.status", False)
if not engine_on:
# If the engine is off, we don't need to calculate the engine hours, so we return the previous value
return previous_value
# Otherwise, we need to calculate the engine hours, so we calculate the delta of time
dt = (message.received_at - previous_message.received_at).total_seconds()
# Finally, we return the engine hours with the new time
return previous_value + dt / 3600
Maintenance required
Using the engine.hours
sensor, we can calculate if the engine requires maintenance.
Disclaimer, we are assuming that the engine requires maintenance after 100 hours of use, when the engine reaches 100 hours, we returns true for this trigger, and wait for the next 100 hours to return true again.
def validate(message, previous_message, asset):
# First of all, we need to get the engine hours
engine_hours = message.sensors.get("engine.hours", 0)
previous_engine_hours = previous_message.sensors.get("engine.hours", 0)
# Check if the engine hours is greater than 100 and the previous engine hours is less than 100
if engine_hours > 100 and previous_engine_hours < 100:
return True
# Otherwise, we get the module of the engine hours and 100 to check if the engine hours is a multiple of 100
module = engine_hours % 100
real_engine_hours = engine_hours - (module * 100)
# Finally, we return true if the real engine hours is greater than 100
return real_engine_hours > 100
Engine status report
The reports module support Python functions to generate report pages, in this example, we will generate a report page with the engine ignition status, we assume that your asset has a sensor for that with the param engine.ignition.status
def process_report(messages, configuration, assets):
# First of all, we create a dict with the assets to simiplify the access to the asset name
assets_by_pk = {asset.pk: asset.name for asset in assets}
# Now, we filter all of the messages that have the engine ignition status sensor
filtered_messages = [message for message in messages if message.sensors.get("engine.ignition.status") is not None]
# Now, we'll create the headers, but we'll use a width calculation to make the report more readable
raw_headers = [
"Asset",
"Engine ignition status",
"Time"
]
sizes = [len(header) * 1.6 for header in raw_headers]
# Now, we'll define the rows of the report
rows = []
tz = zoneinfo.ZoneInfo("America/Panama")
for message in filtered_messages:
# First of all, we need to get the asset name
asset_name = assets_by_pk.get(message.asset_pk, "Unknown asset")
# Now, we need to calculate the sizes of the asset name, if is greater than the current size, we update the size
if len(asset_name) > sizes[0]:
sizes[0] = len(asset_name)
# We repeat the process with the engine ignition status
engine_status = "On" if message.sensors.get("engine.ignition.status") else "Off"
if len(engine_status) > sizes[1]:
sizes[1] = len(engine_status)
# And finally, with the time
msg_at = message.received_at.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S")
# For each message, we'll create a row with the asset name, the engine ignition status and the time
row = ReportRow(
content=[
# Extracts the name of the asset
ReportCol(content=asset_name),
# Extracts the engine ignition status and converts it to a human-readable format
ReportCol(content=engine_status),
# Finally, when
ReportCol(content=msg_at)
],
)
# Append the row to the rows list
rows.append(row)
# Now, we'll create the real headers
headers = [ReportCol(
content=header, # The header name
width=size[i], # The width of the header
color='#001e60', # The color of the header
text_color='#ffffff', # The text color of the header
) for i, header in enumerate(raw_headers)]
return ReportPage(
name='Engine status report', # The name of the report
headers=headers, # The headers of the report
rows=rows, # The rows of the report
)
Kilowatts per day
The idea of this chart is simple, display the kilowatts per day of the assets in a column chart. To do that, we assume that your assets has a sensor for that with the param power.kwh
.
def calculate_series(messages, configuration, assets):
# First of all, we need to create a dict with the max kwh by asset and day
kwh_max_by_asset_day = {}
# Then, we need to create a list with the days
kwh_by_asset_day = {}
# And finally, we need to create the x axis
x_axis = []
tz = zoneinfo.ZoneInfo("America/Panama")
# Now, we need to iterate all of the messages
for message in messages:
# First of all, we need to get the initial kwh
kwh_init = message.sensors.get('power.kwh', None)
# If the initial kwh is not None, we break the loop
if kwh_init is not None:
break
# Again, we need to iterate all of the messages
for message in messages:
# First of all, we need to get the kwh
value = message.sensors.get('power.kwh', None)
try:
# If the value is not None, we round the value to 2 decimals
if value is not None:
value = round(value, 2)
# After that, get the month and day of the message
current_day = message.received_at.astimezone(tz).strftime('%m/%d')
# If this day is not in the x axis, we append it
if current_day not in x_axis:
x_axis.append(current_day)
# If the asset is not in the kwh max by asset day, we create it
if message.asset_id not in kwh_max_by_asset_day:
kwh_max_by_asset_day[message.asset_id] = {}
# If the day is not in the kwh max by asset day, we create it
kwh_max_by_asset_day[message.asset_id][current_day] = value
except:
# If something goes wrong, we pass
pass
# Now, we need to iterate all of the assets
for asset, values in kwh_max_by_asset_day.items():
try:
# If the asset is not in the kwh by asset day, we create it
if asset not in kwh_by_asset_day:
kwh_by_asset_day[asset]= {}
# Then, we need to iterate all of the days
for day, kwh_max in values.items():
kwh = kwh_max - kwh_init
kwh_by_asset_day[asset][day] = round(kwh,2)
kwh_init = kwh_max
except:
pass
y_axis = []
# Now, we need to iterate all of the assets to define the y axis
for asset in assets:
try:
# If the asset is in the kwh by asset day, we create the data
if asset.pk in kwh_by_asset_day:
data = []
# Then, we need to iterate all of the days
for _, kwh in kwh_by_asset_day[asset.pk].items():
data.append(kwh)
# Finally, we append the asset to the y axis
y_axis.append(ChartDataSerie(
label=asset.name,
color='#' + ''.join([random.choice('0123456789ABCDEF') for _ in [1, 2, 3, 4, 5, 6]]), # This operation generates a random color
data=data,
data_type=ChartDataType.NUMBER
))
except:
pass
# Finally, we return the column chart
return ColumnChart(
title=configuration.name,
align=ChartAlignment.CENTER,
x_axis=ChartDataSerie(
data=x_axis,
label='Days',
color='#000000',
data_type=ChartDataType.STRING,
),
y_axis=y_axis,
)