Big and complex data are the fuel for the Internet Of Things (IOT). The RAW platform with its comprehensice support for complex datasets is ideally suited as a real-time data integration and analytics platform for IOT. Let's see how!
The Scenario
We are a site reliability engineer managing control software for industrial machines. Recently we are having suspicious software crashes, and we want to create a data service to collect diagnostics/data to help us identify the cause(s).
The data
We have operational data stored in a database and in a S3 bucket with several formats: log files, CSV and JSON.
We have the following data sources:
- Machine status information logged periodically and stored in a PostgreSQL database.
- Software crashes from Docker, these come in a JSON format and output by familiar docker inspect commands.
- Sensor data from the machines, exported in CSV.
- Error Logs, stored in an S3 bucket, in familiar log file formats that require parsing.
Machine Status
Machines are being monitored and their status and location are being stored in a PostgreSQL database. This table looks like this:
id | model | age | status | latitude | longitude |
---|---|---|---|---|---|
1 | model3 | 18 | OK | 46.515471 | 6.644706 |
2 | model4 | 7 | OK | 46.564782 | 6.551355 |
3 | model3 | 8 | OK | 46.537984 | 6.629472 |
4 | model3 | 7 | OK | 46.570500 | 6.591574 |
To read it we can use PostgreSQL.InferAndRead
. Here we create a function where you pass a machine ID
and returns the corresponding record from the table.
machine(id: int) =
let
data = PostgreSQL.InferAndRead(
"raw",
"example",
"machines",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "..."
)
in
Collection.First(Collection.Filter(data, (x) -> x.id == id))
We are reading data from the database "raw", schema "example" and table "machines". The output for id=1
is:
{
"id": 1,
"model": "model3",
"age": 18,
"status": "OK",
"latitude": 46.515471,
"longitude": 6.644706
}
Software crashes from Docker
Each machine has a specific service controlling it.
These services are deployed using docker. The status of this software can be extracted from the output of the "docker-inspect" command.
The output of "docker-inspect" is a (long) JSON document, e.g.:
{
"Id": "806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3",
"Name": "machine 98",
"Created": "2015-11-26T06:00:00.000",
"Path": "bash",
"Image": "sha256:9873176a8ff5ac192ce4d7df8a403787558b9f3981a4c4d74afb3edceeda451c",
"Driver": "overlay2",
"Platform": "linux",
"Args": [
"arg1"
],
"State": {
"Status": "running",
"Running": false,
"Paused": false,
"Restarting": false,
"OOMKilled": false,
"Dead": true,
"Pid": 86629,
"ExitCode": 3,
"Error": "comp3",
"StartedAt": "2015-11-26T06:00:00.000",
"FinishedAt": "2015-11-26T06:00:00.000"
},
"ResolvConfPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/resolv.conf",
"HostnamePath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hostname",
"HostsPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hosts",
"LogPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3-json.log",
"RestartCount": 0,
We can get the corresponding machine from the Name
field. The field State
has an exit code,
which tells us if the software finished successfully or not.
The following function extracts the relevant information in an easier-to-consume tabular form.
failures(id: int) =
let
dockerInspect = Json.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/docker-inspect-output.json"
),
summary = Collection.Transform(
dockerInspect,
(x) ->
{
MachineId: Int.From(String.SubString(x.Name, 9, 1)),
ExitCode: x.State.ExitCode,
Error: x.State.Error,
FinishedAt: x.State.FinishedAt
}
)
in
Collection.Filter(
summary,
(x) -> x.ExitCode > 0 and x.MachineId == id
)
[
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-01-05T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 1,
"Error": "comp1",
"FinishedAt": "2015-03-06T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 2,
"Error": "comp2",
"FinishedAt": "2015-04-20T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-06-19T06:00:00.000"
},
Error logs
Errors are collected from logs. These logs are uploaded to a logging service, which in turn collects and saves all records into an S3 bucket.
2015-01-01T05:54:15 WARN vibration close to treshold, check instrumentation panel ASAP.
2015-01-01T05:54:58 INFO calibration at 100%, checking inner sub-systems.
2015-01-01T05:55:41 ERROR voltage not measured for more than 25 seconds, reboot machine.
2015-01-01T05:56:24 INFO cleaning procedure schedulled soon, performing sub task 111.
2015-01-01T05:57:07 INFO task 155 schedulled soon, preparing next task.
2015-01-01T05:57:50 WARN inner temp increasing rapidly, please check internet connection.
2015-01-01T05:58:33 INFO cleaning procedure starting, calibrating.
2015-01-01T06:00:00 WARN machine 24 with error=error1
2015-01-01T05:54:15 ERROR inner temp not measured for more than 16 seconds, please call 041 123 456 789.
This file has a lot of data, but right now, we are only interested in lines that report machine errors. We can use Collection.Filter
and a regex to remove all unwanted lines, like this:
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches( x, "(.*) WARN machine (\\d+) with error=(\\w+).*")
)
in
filtered
Output:
[
"2015-01-01T06:00:00 WARN machine 24 with error=error1",
"2015-01-01T06:00:00 WARN machine 73 with error=error4",
"2015-01-01T06:00:00 WARN machine 81 with error=error1",
"2015-01-01T07:00:00 WARN machine 43 with error=error3",
"2015-01-01T08:00:00 WARN machine 14 with error=error4",
"2015-01-01T08:00:00 WARN machine 76 with error=error5"
Now we can use Regex.Groups
to extract all the relevant fields. This is how the final function looks like:
errors(id: int) =
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
),
parsed = Collection.Transform(
filtered,
(x) ->
let
groups = Regex.Groups(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
in
{
machineId: Int.From(List.Get(groups, 1)),
timestamp: Timestamp.Parse(
List.Get(groups, 0),
"y-M-d'T'H:m:s"
),
error: List.Get(groups, 2)
}
)
in
Collection.Filter(parsed, (x) -> x.machineId == id)
errors(1)
Output:
[
{
"machineId": 1,
"timestamp": "2015-01-03T07:00:00.000",
"error": "error1"
},
{
"machineId": 1,
"timestamp": "2015-01-03T20:00:00.000",
"error": "error3"
},
{
"machineId": 1,
"timestamp": "2015-01-04T06:00:00.000",
"error": "error5"
},
Sensor data
Sensor data is collected and stored in CSV files. We can read it using the following function:
telemetry(id: int) =
Collection.Filter(
Csv.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/telemetry-iso-time.csv"
),
(x) -> x.machineID == id
)
Output:
[
{
"datetime": "1/1/2015 6:00:00 AM",
"machineID": 1,
"volt": 176.217853015625,
"rotate": 418.504078221616,
"pressure": 113.077935462083,
"vibration": 45.0876857639276
},
{
"datetime": "1/1/2015 7:00:00 AM",
"machineID": 1,
"volt": 162.87922289706,
"rotate": 402.747489565395,
"pressure": 95.4605253823187,
"vibration": 43.4139726834815
},
Collecting our information for analysis
Now we have all the sources defined, we can start to dig into the data to get answers. For a given machine (id), we would like to collect some information about the last failure. We are interested in:
- Basic information such as the error, timestamp, machine age, model etc. from ‘failures’ (docker json file) and ‘machines’ (database table).
- Sensor data of the 6 hours before the crash (‘telemetry’ from our sensor csv file).
- Errors of the 6 hours before the crash ('errors' from log files).
Let's create a function lastFailureData
which aggregates all necessary data from each one of functions created before.
lastFailureData(machineId: int) =
let
machineData = machine(machineId),
failureData = failures(machineId),
lastFailure = Collection.Max(failureData.FinishedAt),
startMeasure = Timestamp.SubtractInterval(
lastFailure,
Interval.Build(hours = 6)
),
lastFailureRecord = Collection.First(
Collection.Filter(
failureData,
(x) -> x.FinishedAt == lastFailure
)
),
lastTelemetry = Collection.Filter(
telemetry(machineId),
(x) ->
x.datetime < lastFailure and x.datetime > startMeasure
),
lastErrors = Collection.Filter(
errors(machineId),
(x) ->
x.timestamp < lastFailure and x.timestamp > startMeasure
)
in
{
lastFailure: lastFailureRecord,
machineData: machineData,
lastTelemetry: lastTelemetry,
lastErrors: lastErrors
}
lastFailureData(1)
Output:
{
"machineId": 1,
"age": 18,
"model": "model3",
"lastFailure": {
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-12-31T06:00:00.000"
},
"lastTelemetry": [
{
"datetime": "2015-12-31T01:00:00.000",
"machineID": 1,
"volt": 147.720615260015,
"rotate": 493.074645851158,
"pressure": 104.81366016439,
"vibration": 41.2714171061972
},
{
"datetime": "2015-12-31T02:00:00.000",
"machineID": 1,
"volt": 153.93048096902,
"rotate": 353.466012177296,
"pressure": 99.6570720990314,
"vibration": 42.806176552987
},
{
"datetime": "2015-12-31T03:00:00.000",
"machineID": 1,
"volt": 175.481807900786,
"rotate": 475.951631160907,
"pressure": 88.7452579535092,
"vibration": 39.9863347521755
},
{
"datetime": "2015-12-31T04:00:00.000",
"machineID": 1,
"volt": 179.860806868559,
"rotate": 461.478368479999,
"pressure": 120.299989462607,
"vibration": 35.8235042398746
},
{
"datetime": "2015-12-31T05:00:00.000",
"machineID": 1,
"volt": 172.645716803532,
"rotate": 386.985814610685,
"pressure": 96.0729702714405,
"vibration": 35.7556427077587
}
],
"lastErrors": []
}
Want to learn more? Join us on Discord to leave your thoughts.