Latest Item

Latest Blog articles from various categories

Anupam Bagchi

Anupam Bagchi

Anupam Bagchi has been in the Information Technology industry since the early nineties in the Silicon Valley. He has a Ph.D in Robotics and is deeply interested in recent advances in Machine Learning and Artificial Intelligence. Professionally, he is a Data Scientist. His primary hobby is photography.

Unknown Known

Donald Rumsfield - Knows and UnknowsBack when the US was having a war with Iraq, Donald Rumsfield would come on TV and give a status of the war. In one of those updates, he mentioned the status in terms of "known" elements and "unknown" elements. He introduced a concept that it very useful in the business world. Here is my interpretation of the same concept applied to life in general. This holds true for business exploration, assessing project risk, research and development, or engineering design - whereever you are attempting to find a solution to a new problem.

The Known-Unknown Matrix

Assuming you are attempting to find the solution to a complex problem, you will encounter may items which will fall in any of these four categories.

Known Unknown Matrix

The goal of every project is to move all items towards the KNOWN KNOWNS category (Quadrant 4). However as you start off and move along - analysing every element in the project one-by-one, many things start off in the other quadrants (1, 2 or 3) and then slowly move to the adjacent quadrants till it reaches the Known-knowns quadrant. Let us analyze the items in each of these four quadrants one by one.

KNOWN KNOWNS (Quadrant 4)

At the project inception, these are the elements you generally work with. Breaking down a project into several sub-projects, the goal of each sub-project is clearly defined, as is the path to reach it. So your work will involve accomplishing the task with all known elements in sight, using a methodology that is also known to you. However there are always risks in any project that are beyond control. In this case, those risks are also known to you, and the steps to counter those risks, when they do arrive, are also known to you. In other words you are well prepared to complete the project given the probability of the defined risks that are already known to you. Thus this quadrant is about the Probabilistic Risks of a project.

 A world where everything is known is awesome - including the project goals, the approach to achieve those goals as well as the risk elements of the project, and the means to counteract them. But alas, the world is not so simple. And honestly if it was that simple, there would be no challenge in undertaking a new project, because you are treading on a beaten path all the time with predictable results. This brings us to the other known quadrant.

KNOWN UNKNOWS (Quadrant 2)

Since you are not the only person who has

 References:

1. Sources of Project Risk

Saturday, 18 August 2018 21:57

Managing effectiveness of Email campaigns

A Behavior Analysis Problem

Despite the growth of alternate communication means like Chat applications (Whatsapp, Slack, Snapchat), Social web-sites (Facebook, Google Plus, Twitter), communicating via icons (Emojis) and images (Instagram), the good old Email is not yet dead. Almost all companies still rely on Email to not only run their business, but also to do marketing to their customers. This, in itself, has evolved into a field of study. The main concern facing any marketing manager is to control the amount of email to send to their clients so that the customer stays interested in the company and does not unsubscribe because of too much junk mail. So the balance between too much mail and too little mail is very sensitive.

We can employ data science to solve this problem. The problem solved here is taken from a real company's data and was presented to me as a test. So I thought this is good problem to solve and demonstrate the use of data science. Emailing customers too frequently can cause them to unsubscribe from the email list. Sending them too many mails will lead them to not open emails that can cause promotional emails to be sent to their junk mailbox. The goal is to find out the optimum amount of emails to be sent to each customer so that they stay interested over time. Note that this number is different for every customer. We want to predict a customer's propensity to open an email in the future based on observed historical trends as well as their propensity to unsubscribe if they do open the email.

The Dataset

Before I start describing the dataset, you may be inclined to see it first. To introduce some variety in my solution approach - and since most of the data is tabular in fashion, I have decided to use a relational database (MySQL) to store the data, and query it, using JOINs if necessary to generate my feature set. You can download the entire dump of the MySQL database and restore it on your local machine to get started.

What data do we have?

We have data in 5 tables as follows:

  1. Campaign Types - A historical list of campaigns with date and campaign type
  2. User Info - a list of user ids along with registration date and the source from where their email was acquired
  3. Sends - a list of dates when an email was sent to the user and a flag indicating if the email was opened
  4. Opens - a list of dates when an email was opened by the subscriber and unsubscribed
  5. Events - a list of events pertaining to each user along with counts. The count represents different things depending on the type of event.

Let me describe in detail what each column in the given tables mean.

Campaign Type

Column Data Type Description
 launch_id Date Unique identifier of a campaign by date of send.
camptype String Type of campaign (email send), where “Heavy Promo”, “Light Promo”, “Evergreen” (No Promo) refer to the magnitude of promo offer, and “Primary Messaging”/”Secondary Messaging” refer to the degree to which the promotion is messaged within the email content.

User Info

Column Data Type Description
 riid Integer Unique identifier of a customer
aq_dt Datetime Date customer's email was acquired as stored in current email system
sub_source String Desciption of where the customer registered their email

Sends

Column Data Type Description
 riid Integer Unique identifier of a customer
campaign_send_dt Datetime Send date of campaign
opened Integer 0 or 1, indicator of whether or not a given riid opened a given campaign

Opens

Column Data Type Description
 riid Integer Unique identifier of a customer
campaign_send_dt Datetime Send date of campaign
optout Integer 0 or 1, indicator of whether or not a given riid opened a given campaign and unsubscribed

Events

Column Data Type Description
riid Integer Unique identifier of a customer
event String Name of the event the record is associated:
  • Open (opened an email)
  • Sent (received an email)
  • Click (clicked an email)
  • Online Purch (made a purchase online).
event_captured_dt Datetime Date an event was recorded.
cnts Integer Different aggregate measures depending on the associated event:

  • Open: Count of all opens recorded for an individual on a given day
.
  • Send: Count of all sends recorded for an individual on a given day
.
  • Click: Count of all clicks recorded for an individual on a given day
.
  • Online Purch: Sum total of number of items purchaced by an individual on a given day

Understanding the dataset

Before we jump into a predictive model building, let us analyze the data to understand it better.

Aggregate open rate

As a first exercise, let us attempt  to detemine the aggregate open rate. In other words, what percentage of people actually opened their email. This is a very simple ratio determined from the count of emails opened vs. count of email. We can do this in one SQL statement as follows:

  1. mysql> select (select count(*) from sends where opened = 1)/count(*) from sends;
  2. +--------------------------------------------------------+
  3. | (select count(*) from sends where opened = 1)/count(*) |
  4. +--------------------------------------------------------+
  5. | 0.1863 |
  6. +--------------------------------------------------------+
  7. 1 row in set (0.13 sec)
  8.  

The answer is about 18%.

Aggregate unsubscribe per open rate

Of the people who opened their email, how many got irritated and unsubscribed? Again, this is a simple SQL query on the "opens" table as follows:

  1. mysql> select (select count(*) from opens where optout = 1)/count(*) from opens;
  2. +--------------------------------------------------------+
  3. | (select count(*) from opens where optout = 1)/count(*) |
  4. +--------------------------------------------------------+
  5. | 0.0137 |
  6. +--------------------------------------------------------+
  7. 1 row in set (0.11 sec)

The answer is about 1.4%.

Aggregate open rate ordered by launch Id

How about calculating the open rate based on launch Id. Since one mail is sent every day, we can group the results by launch date and expect the same result. This is a simple group by query as follows:

  1. mysql> SELECT t1.campaign_send_dt AS campaign_send_date, t1.total_daily_sends, t2.total_opened,
  2. (t2.total_opened/t1.total_daily_sends) AS ratio FROM (SELECT campaign_send_dt, COUNT(*) AS total_daily_sends FROM sends
  3. GROUP BY campaign_send_dt ORDER BY campaign_send_dt) AS t1
  4. LEFT JOIN (SELECT campaign_send_dt, COUNT(*) AS total_opened FROM sends
  5. WHERE opened = 1 GROUP BY campaign_send_dt ORDER BY campaign_send_dt) AS t2
  6. ON t1.campaign_send_dt = t2.campaign_send_dt;
  7. +---------------------+-------------------+--------------+--------+
  8. | campaign_send_date | total_daily_sends | total_opened | ratio |
  9. +---------------------+-------------------+--------------+--------+
  10. | 2018-01-01 00:00:00 | 1084 | 310 | 0.2860 |
  11. | 2018-01-03 00:00:00 | 4575 | 790 | 0.1727 |
  12. | 2018-01-05 00:00:00 | 2074 | 502 | 0.2420 |
  13. | 2018-01-07 00:00:00 | 2582 | 501 | 0.1940 |
  14. | 2018-01-08 00:00:00 | 1606 | 337 | 0.2098 |
  15. | 2018-01-09 00:00:00 | 4210 | 690 | 0.1639 |
  16. | 2018-01-11 00:00:00 | 3316 | 574 | 0.1731 |
  17. | 2018-01-12 00:00:00 | 1905 | 422 | 0.2215 |
  18. | 2018-01-14 00:00:00 | 1862 | 435 | 0.2336 |
  19. | 2018-01-15 00:00:00 | 3141 | 564 | 0.1796 |
  20. | 2018-01-16 00:00:00 | 762 | 169 | 0.2218 |
  21. | 2018-01-18 00:00:00 | 2709 | 571 | 0.2108 |
  22. | 2018-01-19 00:00:00 | 1916 | 413 | 0.2156 |
  23. | 2018-01-21 00:00:00 | 3189 | 582 | 0.1825 |
  24. | 2018-01-22 00:00:00 | 535 | 63 | 0.1178 |
  25. | 2018-01-23 00:00:00 | 1948 | 474 | 0.2433 |
  26. | 2018-01-25 00:00:00 | 2581 | 479 | 0.1856 |
  27. | 2018-01-26 00:00:00 | 1953 | 413 | 0.2115 |
  28. | 2018-01-28 00:00:00 | 1008 | 244 | 0.2421 |
  29. | 2018-01-30 00:00:00 | 219 | 35 | 0.1598 |
  30. | 2018-01-31 00:00:00 | 6347 | 1057 | 0.1665 |
  31. | 2018-02-01 00:00:00 | 1077 | 274 | 0.2544 |
  32. | 2018-02-02 00:00:00 | 2626 | 436 | 0.1660 |
  33. | 2018-02-04 00:00:00 | 1967 | 396 | 0.2013 |
  34. | 2018-02-06 00:00:00 | 4989 | 883 | 0.1770 |
  35. | 2018-02-08 00:00:00 | 1031 | 238 | 0.2308 |
  36. | 2018-02-09 00:00:00 | 2731 | 461 | 0.1688 |
  37. | 2018-02-11 00:00:00 | 1994 | 424 | 0.2126 |
  38. | 2018-02-12 00:00:00 | 1209 | 348 | 0.2878 |
  39. | 2018-02-13 00:00:00 | 1936 | 384 | 0.1983 |
  40. | 2018-02-14 00:00:00 | 56 | 24 | 0.4286 |
  41. | 2018-02-15 00:00:00 | 4241 | 626 | 0.1476 |
  42. | 2018-02-16 00:00:00 | 1882 | 371 | 0.1971 |
  43. | 2018-02-18 00:00:00 | 2554 | 489 | 0.1915 |
  44. | 2018-02-19 00:00:00 | 5686 | 764 | 0.1344 |
  45. | 2018-02-21 00:00:00 | 467 | 141 | 0.3019 |
  46. | 2018-02-23 00:00:00 | 1888 | 363 | 0.1923 |
  47. | 2018-02-25 00:00:00 | 2938 | 525 | 0.1787 |
  48. | 2018-02-26 00:00:00 | 1128 | 314 | 0.2784 |
  49. | 2018-02-27 00:00:00 | 7569 | 1081 | 0.1428 |
  50. | 2018-02-28 00:00:00 | 2509 | 459 | 0.1829 |
  51. +---------------------+-------------------+--------------+--------+
  52. 41 ROWS IN SET (0.33 sec)

The answer lies between 11% and 42% that happened only once when mails sent were too low.

Aggregate open rate based on registration source

We could also group the results based on other criteria, like sub_source. This query will be a bit longer, but here it is as given below. Note that we are using the WITH statement available in MySQL that is a more recent addition to the MySQL syntax.

 

  1. mysql> WITH sends_wider AS (SELECT sends.riid, sends.campaign_send_dt, sends.opened, user_info.aq_dt, user_info.sub_source FROM SENDS
  2. LEFT JOIN user_info ON sends.riid = user_info.riid)
  3. SELECT t1.sub_source AS sub_source, t1.total_daily_sends, t2.total_opened, (t2.total_opened/t1.total_daily_sends) AS ratio
  4. FROM (SELECT sub_source, COUNT(*) AS total_daily_sends FROM sends_wider
  5. GROUP BY sub_source ORDER BY sub_source) AS t1
  6. LEFT JOIN (SELECT sub_source, COUNT(*) AS total_opened FROM sends_wider WHERE opened = 1 GROUP BY sub_source ORDER BY sub_source) AS t2
  7. ON t1.sub_source = t2.sub_source;
  8. +---------------------+-------------------+--------------+--------+
  9. | sub_source | total_daily_sends | total_opened | ratio |
  10. +---------------------+-------------------+--------------+--------+
  11. | Checkout_Online | 26189 | 5091 | 0.1944 |
  12. | eReceipt_Physical | 8750 | 1722 | 0.1968 |
  13. | Facebook CPL_Online | 830 | 125 | 0.1506 |
  14. | Footer_Desktop | 2537 | 555 | 0.2188 |
  15. | Footer_Mobile | 2221 | 407 | 0.1833 |
  16. | Join_Desktop | 1532 | 283 | 0.1847 |
  17. | Join_Mobile | 343 | 78 | 0.2274 |
  18. | Lightbox_Desktop | 15254 | 2712 | 0.1778 |
  19. | Lightbox_Mobile | 4734 | 1029 | 0.2174 |
  20. | My Account_Desktop | 599 | 119 | 0.1987 |
  21. | My Account_Mobile | 166 | 20 | 0.1205 |
  22. | Other_Other | 17098 | 2759 | 0.1614 |
  23. | POS form_Physical | 17542 | 3241 | 0.1848 |
  24. | Sports/Brand_Online | 670 | 150 | 0.2239 |
  25. | Warehouse_Desktop | 999 | 206 | 0.2062 |
  26. | Warehouse_Mobile | 536 | 129 | 0.2407 |
  27. +---------------------+-------------------+--------------+--------+
  28. 16 ROWS IN SET (0.37 sec)

A ratio is expected to average 18% or so, and the results look plausible.

Aggregate open rate based on month of acquisition

Another similar criteria to group the results is month of acquisition. Here is the query for that.

 

  1. mysql> WITH sends_wider AS
  2. (SELECT sends.riid, sends.campaign_send_dt, sends.opened, user_info.aq_dt, user_info.sub_source FROM sends
  3. LEFT JOIN user_info ON sends.riid = user_info.riid) SELECT t1.aq_dt AS aq_dt, t1.total_daily_sends, t2.total_opened,
  4. (t2.total_opened/t1.total_daily_sends) AS ratio
  5. FROM (SELECT aq_dt, COUNT(*) AS total_daily_sends FROM sends_wider GROUP BY aq_dt ORDER BY aq_dt) AS t1
  6. LEFT JOIN (SELECT aq_dt, COUNT(*) AS total_opened FROM sends_wider WHERE opened = 1 GROUP BY aq_dt ORDER BY aq_dt) AS t2
  7. ON t1.aq_dt = t2.aq_dt;
  8. +---------------------+-------------------+--------------+--------+
  9. | aq_dt | total_daily_sends | total_opened | ratio |
  10. +---------------------+-------------------+--------------+--------+
  11. | 2010-01-06 00:00:00 | 1 | NULL | NULL |
  12. | 2010-01-11 00:00:00 | 1 | NULL | NULL |
  13. | 2010-01-13 00:00:00 | 7 | 5 | 0.7143 |
  14. | 2010-01-18 00:00:00 | 5 | NULL | NULL |
  15. | 2010-01-21 00:00:00 | 4 | NULL | NULL |
  16. ...
  17. ...
  18. ...
  19. | 2018-01-17 00:00:00 | 86 | 16 | 0.1860 |
  20. | 2018-01-18 00:00:00 | 38 | 10 | 0.2632 |
  21. | 2018-01-19 00:00:00 | 131 | 26 | 0.1985 |
  22. | 2018-01-20 00:00:00 | 64 | 18 | 0.2813 |
  23. | 2018-01-21 00:00:00 | 44 | 15 | 0.3409 |
  24. | 2018-01-22 00:00:00 | 29 | 3 | 0.1034 |
  25. | 2018-01-23 00:00:00 | 26 | 11 | 0.4231 |
  26. | 2018-01-24 00:00:00 | 25 | 10 | 0.4000 |
  27. | 2018-01-25 00:00:00 | 23 | 12 | 0.5217 |
  28. | 2018-01-26 00:00:00 | 25 | 10 | 0.4000 |
  29. | 2018-01-27 00:00:00 | 24 | 10 | 0.4167 |
  30. | 2018-01-28 00:00:00 | 31 | 6 | 0.1935 |
  31. | 2018-01-29 00:00:00 | 12 | 2 | 0.1667 |
  32. | 2018-01-30 00:00:00 | 25 | 5 | 0.2000 |
  33. | 2018-01-31 00:00:00 | 73 | 18 | 0.2466 |
  34. | 2018-02-01 00:00:00 | 34 | 17 | 0.5000 |
  35. | 2018-02-02 00:00:00 | 29 | 7 | 0.2414 |
  36. | 2018-02-03 00:00:00 | 34 | 11 | 0.3235 |
  37. | 2018-02-04 00:00:00 | 30 | 9 | 0.3000 |
  38. | 2018-02-05 00:00:00 | 25 | 7 | 0.2800 |
  39. | 2018-02-06 00:00:00 | 22 | 11 | 0.5000 |
  40. | 2018-02-07 00:00:00 | 46 | 11 | 0.2391 |
  41. | 2018-02-08 00:00:00 | 64 | 14 | 0.2188 |
  42. | 2018-02-09 00:00:00 | 52 | 16 | 0.3077 |
  43. | 2018-02-10 00:00:00 | 17 | 3 | 0.1765 |
  44. | 2018-02-11 00:00:00 | 10 | 4 | 0.4000 |
  45. | 2018-02-12 00:00:00 | 12 | 5 | 0.4167 |
  46. | 2018-02-13 00:00:00 | 12 | 2 | 0.1667 |
  47. | 2018-02-14 00:00:00 | 11 | 6 | 0.5455 |
  48. | 2018-02-15 00:00:00 | 8 | 4 | 0.5000 |
  49. | 2018-02-16 00:00:00 | 12 | 3 | 0.2500 |
  50. | 2018-02-17 00:00:00 | 10 | 1 | 0.1000 |
  51. | 2018-02-18 00:00:00 | 16 | 2 | 0.1250 |
  52. | 2018-02-19 00:00:00 | 5 | 2 | 0.4000 |
  53. | 2018-02-20 00:00:00 | 11 | 8 | 0.7273 |
  54. | 2018-02-21 00:00:00 | 13 | 3 | 0.2308 |
  55. | 2018-02-22 00:00:00 | 13 | 1 | 0.0769 |
  56. | 2018-02-23 00:00:00 | 13 | 4 | 0.3077 |
  57. | 2018-02-24 00:00:00 | 8 | 1 | 0.1250 |
  58. | 2018-02-25 00:00:00 | 9 | 5 | 0.5556 |
  59. | 2018-02-26 00:00:00 | 1 | NULL | NULL |
  60. +---------------------+-------------------+--------------+--------+
  61. 2815 ROWS IN SET (0.48 sec)

The user data appears to have some problems (NULL values) for earlier dates. This was confirmed as those were reported to have been migrated from a legacy email system, and were not completely correct.

To open or not

So far we had no need to use data-science tools as most of the analysis was simple enough to handle through SQL. Let's try to do a slightly more involved analysis. Before a data scientist attempts to do some prediction, he/she has to work with some conjectures or hypotheses. These are theories based on intuition and one must be able to either prove or disprove it using derivation from the data.

One of the assumptions is that a customer's propensity to open an email as well as their propensity to unsubscribe (after opening the mail) may be related to their tenure. To analyze this theory we need a dataset that has the tenure (difference between the mail's send date and the acquisition date) alongside for each record in the "send" table.

How do we get the tenure in the "sends" table? This is done through a simple JOIN statement as follows:

  1. mysql> SELECT sends.riid, sends.campaign_send_dt, sends.opened, user_info.aq_dt, user_info.sub_source,
  2. DATEDIFF(sends.campaign_send_dt, user_info.aq_dt) AS tenure FROM sends LEFT JOIN user_info
  3. ON sends.riid = user_info.riid LIMIT 10;
  4. +-----------+---------------------+--------+---------------------+-------------------+--------+
  5. | riid | campaign_send_dt | opened | aq_dt | sub_source | tenure |
  6. +-----------+---------------------+--------+---------------------+-------------------+--------+
  7. | 747246582 | 2018-02-19 00:00:00 | 0 | 2016-12-07 00:00:00 | Lightbox_Desktop | 439 |
  8. | 773178502 | 2018-02-04 00:00:00 | 0 | 2017-02-03 00:00:00 | Join_Desktop | 366 |
  9. | 89069862 | 2018-01-11 00:00:00 | 1 | 2010-10-23 00:00:00 | Other_Other | 2637 |
  10. | 899893722 | 2018-01-15 00:00:00 | 0 | 2017-11-08 00:00:00 | Footer_Mobile | 68 |
  11. | 88695502 | 2018-02-02 00:00:00 | 0 | 2011-03-06 00:00:00 | Other_Other | 2525 |
  12. | 939655902 | 2018-02-27 00:00:00 | 0 | 2017-12-31 00:00:00 | eReceipt_Physical | 58 |
  13. | 587652902 | 2018-02-15 00:00:00 | 0 | 2015-11-12 00:00:00 | Other_Other | 826 |
  14. | 865323662 | 2018-02-11 00:00:00 | 1 | 2017-08-20 00:00:00 | Checkout_Online | 175 |
  15. | 166559382 | 2018-01-19 00:00:00 | 0 | 2014-02-22 00:00:00 | POS form_Physical | 1427 |
  16. | 894975022 | 2018-01-01 00:00:00 | 0 | 2017-10-25 00:00:00 | Lightbox_Desktop | 68 |
  17. +-----------+---------------------+--------+---------------------+-------------------+--------+
  18. 10 ROWS IN SET (0.00 sec)

The statement above prints only 10 rows, but the tenure has been elegantly inserted as an extra column in this table. If our theory is correct, there should be a positive relationship between the "opened" state and tenure.  Since the "opened" state is a discrete variable which takes only two values (0 and 1), we may be able to show a corelation if we plot a histogram of opened state against tenure range.

Since I have made my hands dirty with SQL, let me become a bit bold and try to write a SQL query to plot a side-ways histogram using just SQL. Yes, you were expecting me to write a Python program or an R program to draw a beautiful histogram, but I am feeling a bit lazy now, and if SQL can give me a decent graphic, why not?

  1. mysql> SELECT ROUND(tenure, -2) AS tenure_bucket, COUNT(*) AS COUNT, RPAD('', LN(COUNT(*)), '*') AS bar
  2. FROM (SELECT sends.riid, sends.campaign_send_dt, sends.opened, user_info.aq_dt, user_info.sub_source,
  3. DATEDIFF(sends.campaign_send_dt, user_info.aq_dt) AS tenure FROM sends LEFT JOIN user_info
  4. ON sends.riid = user_info.riid WHERE opened = 1) AS histogram
  5. GROUP BY tenure_bucket
  6. ORDER BY tenure_bucket;
  7. +---------------+-------+----------+
  8. | tenure_bucket | COUNT | bar |
  9. +---------------+-------+----------+
  10. | 0 | 932 | ******* |
  11. | 100 | 2237 | ******** |
  12. | 200 | 1507 | ******* |
  13. | 300 | 1529 | ******* |
  14. | 400 | 1510 | ******* |
  15. | 500 | 1295 | ******* |
  16. | 600 | 929 | ******* |
  17. | 700 | 739 | ******* |
  18. | 800 | 830 | ******* |
  19. | 900 | 703 | ******* |
  20. | 1000 | 1486 | ******* |
  21. | 1100 | 623 | ****** |
  22. | 1200 | 409 | ****** |
  23. | 1300 | 378 | ****** |
  24. | 1400 | 349 | ****** |
  25. | 1500 | 512 | ****** |
  26. | 1600 | 374 | ****** |
  27. | 1700 | 309 | ****** |
  28. | 1800 | 254 | ****** |
  29. | 1900 | 369 | ****** |
  30. | 2000 | 275 | ****** |
  31. | 2100 | 160 | ***** |
  32. | 2200 | 191 | ***** |
  33. | 2300 | 220 | ***** |
  34. | 2400 | 116 | ***** |
  35. | 2500 | 102 | ***** |
  36. | 2600 | 110 | ***** |
  37. | 2700 | 80 | **** |
  38. | 2800 | 73 | **** |
  39. | 2900 | 23 | *** |
  40. | 3000 | 2 | * |
  41. +---------------+-------+----------+
  42. 31 ROWS IN SET (0.10 sec)

This SQL query needs some explanation. In the internal SQL query "select sends.riid, sends.campaign_send_dt, sends.opened, user_info.aq_dt, user_info.sub_source, DATEDIFF(sends.campaign_send_dt, user_info.aq_dt) AS tenure from sends left join user_info ON sends.riid = user_info.riid where opened = 1" we are just creating a joined table with only the entries that were opened. Then the rest of query is creating buckets with a range of 100 in each bucket. The variable "COUNT" is the count of entries in each bucket and the "bar" is a poor man's histogram.

We see that if the tenure is low, then the person is more likely to open the email. Those with a higher tenure tend not to open their email. This may be explained by the fact that interest in the email dwindles with time - which I believe is natural to expect.

To unsubscribe or not

A related problem to the above is to figure out how tenure affects someone's likelyhood of unsubscribing. For that we can use the "opens" table and do a similar query on the "optout" column. This will tell us if people who have a long tenure are more likely to unsubscribe or not. To continue the tradition, I am going to write this query in SQL again.

  1. mysql> SELECT ROUND(tenure, -2) AS tenure_bucket, COUNT(*) AS COUNT, RPAD('', LN(COUNT(*)), '*') AS bar
  2. FROM (SELECT opens.riid, opens.campaign_send_dt, opens.optout, user_info.aq_dt, user_info.sub_source,
  3. DATEDIFF(opens.campaign_send_dt, user_info.aq_dt) AS tenure FROM opens LEFT JOIN user_info
  4. ON opens.riid = user_info.riid WHERE optout = 1) AS histogram
  5. GROUP BY tenure_bucket
  6. ORDER BY tenure_bucket;
  7. +---------------+-------+--------+
  8. | tenure_bucket | COUNT | bar |
  9. +---------------+-------+--------+
  10. | 0 | 239 | ***** |
  11. | 100 | 288 | ****** |
  12. | 200 | 135 | ***** |
  13. | 300 | 113 | ***** |
  14. | 400 | 125 | ***** |
  15. | 500 | 75 | **** |
  16. | 600 | 52 | **** |
  17. | 700 | 44 | **** |
  18. | 800 | 40 | **** |
  19. | 900 | 39 | **** |
  20. | 1000 | 91 | ***** |
  21. | 1100 | 20 | *** |
  22. | 1200 | 15 | *** |
  23. | 1300 | 10 | ** |
  24. | 1400 | 8 | ** |
  25. | 1500 | 16 | *** |
  26. | 1600 | 8 | ** |
  27. | 1700 | 8 | ** |
  28. | 1800 | 10 | ** |
  29. | 1900 | 9 | ** |
  30. | 2000 | 5 | ** |
  31. | 2100 | 9 | ** |
  32. | 2200 | 7 | ** |
  33. | 2300 | 2 | * |
  34. | 2400 | 1 | |
  35. | 2500 | 2 | * |
  36. | 2700 | 1 | |
  37. | 2800 | 2 | * |
  38. +---------------+-------+--------+
  39. 28 ROWS IN SET (0.05 sec)

It is very clear from the histogram above that people with less tenure tend to opt out of email campaigns quickly. Those whose have been a subscriber for a long time will hesitate to opt out of email campaigns. The highest rate of opt-outs come between the first 200 days or so.

Predicting a customer's reaction

I am finally going to solve a real data-science problem with this data-set. For those of you who believe that I have been dilly-dallying with SQL all this while, this would be interesting.

We would like to predict the likelihood that a customer will open an email sent on a given day in the future for a given campaign type. Is it also possible to predict if the person who opens the email will unsubscribe? Let us try first create the dataset and then fit a model on that dataset.

To solve this problem, I would like to create a dataset that has the following columns.

  1. User id (riid)
  2. Date (just for reference, not to be used in dataset)
  3. Tenure on that day
  4. Type of campaign (camptype)
  5. Source of user's email address (sub_source)
  6. Number of times the user opened the email that day
  7. Whether the user unsubscribed from list that day
  8. Sum total of items purchased by an individual on a given day

Most of the information is in table "events", but it needs to be augmented with additional information from other tables. To do so we have to do a few more joins. Ideally I would like to do everything in one join statement as follows:

 

  1. CREATE TABLE emailcampaign.dataset
  2. SELECT events.riid, events.event, events.event_captured_dt, events.cnts, user_info.aq_dt, user_info.sub_source,
  3. DATEDIFF(events.event_captured_dt, user_info.aq_dt) AS tenure, campaign_types.camptype, opens.optout
  4. FROM events
  5. LEFT JOIN user_info
  6. ON events.riid = user_info.riid
  7. LEFT JOIN campaign_types ON events.event_captured_dt = campaign_types.launch_id
  8. LEFT JOIN opens ON opens.campaign_send_dt = events.event_captured_dt AND opens.riid = events.riid;

But alas, the datasize is too big to do this in one shot. We have about 8.3 million records in the events table! So we have to break up this query into several parts and do some indexing in between to make it work faster. Here is the trick.

We would like to do the joins in three stages - joining one table at a time and saving the result to a different table. (This is similar to the MapReduce paradigm where each MapReduce step writes its result to the disk before the next stage begins). Here are the steps to do it for this problem. We are creating three tables called dataset1, dataset2 and dataset3 - each of them them bigger than the previous one and pulling in some columns from different tables. Our final table will be dataset3 which will be used for the machine learning exercise.

Here is how you will create the first dataset.

  1. mysql> CREATE TABLE emailcampaign.dataset1
  2. -> SELECT events.riid, events.event, events.event_captured_dt, events.cnts,
  3. -> user_info.aq_dt, user_info.sub_source,
  4. -> DATEDIFF(events.event_captured_dt, user_info.aq_dt) AS tenure
  5. -> FROM events
  6. -> LEFT JOIN user_info
  7. -> ON events.riid = user_info.riid;
  8. Query OK, 8358444 ROWS affected (2 MIN 58.12 sec)
  9. Records: 8358444 Duplicates: 0 Warnings: 0

Notice that we have the same number of rows in table dataset1 as the events table. The next stage includes more data from the campaign_types table

  1. mysql> CREATE TABLE emailcampaign.dataset2
  2. -> SELECT dataset1.riid, dataset1.event, dataset1.event_captured_dt, dataset1.cnts,
  3. -> dataset1.aq_dt, dataset1.sub_source,
  4. -> dataset1.tenure, campaign_types.camptype
  5. -> FROM dataset1
  6. -> LEFT JOIN campaign_types
  7. -> ON dataset1.event_captured_dt = campaign_types.launch_id
  8. -> ;
  9. Query OK, 8358444 ROWS affected (2 MIN 59.94 sec)
  10. Records: 8358444 Duplicates: 0 Warnings: 0

Before we do the final join, it is necessary to make the database efficient as this join is quite exhaustive. So let's create some indices to make the database queries efficient. I am going to create four indices as follows:

  1. mysql> CREATE INDEX idx1 ON opens (campaign_send_dt);
  2. Query OK, 0 ROWS affected (0.37 sec)
  3. Records: 0 Duplicates: 0 Warnings: 0
  4.  
  5. mysql> CREATE INDEX idx2 ON dataset2 (event_captured_dt);
  6. Query OK, 0 ROWS affected (20.83 sec)
  7. Records: 0 Duplicates: 0 Warnings: 0
  8.  
  9. mysql> CREATE INDEX idx3 ON opens (riid);
  10. Query OK, 0 ROWS affected (0.25 sec)
  11. Records: 0 Duplicates: 0 Warnings: 0
  12.  
  13. mysql> CREATE INDEX idx4 ON dataset2 (riid);
  14. Query OK, 0 ROWS affected (20.78 sec)
  15. Records: 0 Duplicates: 0 Warnings: 0

The final join brings in data from the opens table into this composite table.

  1. mysql> CREATE TABLE emailcampaign.dataset3
  2. -> SELECT dataset2.riid, dataset2.event, dataset2.event_captured_dt, dataset2.cnts,
  3. -> dataset2.aq_dt, dataset2.sub_source,
  4. -> dataset2.tenure, dataset2.camptype, opens.optout
  5. -> FROM dataset2
  6. -> LEFT JOIN opens
  7. -> ON opens.campaign_send_dt = dataset2.event_captured_dt AND opens.riid = dataset2.riid;
  8. Query OK, 8358444 ROWS affected (4 MIN 25.61 sec)
  9. Records: 8358444 Duplicates: 0 Warnings: 0

It turns out that the data came out in a slightly different format than what I had expected, but it is workable. Here is a small sample of the resulting table which will be our starting dataset for the rest of the machine learning exercise.

  1. mysql> SELECT * FROM dataset3 WHERE camptype IS NOT NULL AND optout IS NOT NULL LIMIT 10;
  2. +-----------+-------+---------------------+------+---------------------+-------------------+--------+-----------------------------------+--------+
  3. | riid | event | event_captured_dt | cnts | aq_dt | sub_source | tenure | camptype | optout |
  4. +-----------+-------+---------------------+------+---------------------+-------------------+--------+-----------------------------------+--------+
  5. | 91355862 | SENT | 2018-02-27 00:00:00 | 1 | 2011-02-21 00:00:00 | POS form_Physical | 2563 | Heavy Promo - Dedicated Messaging | 0 |
  6. | 86264082 | SENT | 2018-02-15 00:00:00 | 1 | 2010-04-29 00:00:00 | Other_Other | 2849 | Heavy Promo - Dedicated Messaging | 0 |
  7. | 111704342 | SENT | 2018-01-08 00:00:00 | 1 | 2012-07-28 00:00:00 | POS form_Physical | 1990 | Heavy Promo - Secondary Messaging | 0 |
  8. | 94780402 | SENT | 2018-02-01 00:00:00 | 1 | 2015-03-31 00:00:00 | Checkout_Online | 1038 | Heavy Promo | 0 |
  9. | 91122822 | SENT | 2018-01-31 00:00:00 | 1 | 2011-02-04 00:00:00 | POS form_Physical | 2553 | Heavy Promo - Dedicated Messaging | 0 |
  10. | 115866522 | SENT | 2018-01-25 00:00:00 | 1 | 2012-10-18 00:00:00 | Other_Other | 1925 | Light Promo | 0 |
  11. | 104867102 | SENT | 2018-02-12 00:00:00 | 1 | 2012-05-07 00:00:00 | Other_Other | 2107 | Light Promo - Secondary Messaging | 0 |
  12. | 131624382 | SENT | 2018-02-08 00:00:00 | 1 | 2014-05-31 00:00:00 | Checkout_Online | 1349 | No Promo | 0 |
  13. | 145310062 | SENT | 2018-01-23 00:00:00 | 1 | 2015-03-31 00:00:00 | Checkout_Online | 1029 | Light Promo - Secondary Messaging | 0 |
  14. | 86463002 | SENT | 2018-01-18 00:00:00 | 1 | 2010-05-05 00:00:00 | Other_Other | 2815 | Light Promo - Secondary Messaging | 0 |
  15. +-----------+-------+---------------------+------+---------------------+-------------------+--------+-----------------------------------+--------+
  16. 10 ROWS IN SET (0.00 sec)

I am going to take one more step with processing the data to make my life simpler for the following step. I am about to use a standard machine learning package to process my data, and so I am going to export the entire dataset to a file format that is friendly to most packages. Yes, it is possible to read the data from the database directly (and I will show it in a different article some day), but at the moment I want to solve this problem with the tools already availble in the machine learning package without writing anything complicated.

Therefore I am going to export the entire table into CSV format so that I can read it from a file. The export is done simply through the following statement:

  1. mysql> SELECT riid, event, event_captured_dt, cnts, aq_dt, sub_source, tenure, camptype, optout
  2. -> FROM dataset3 WHERE camptype IS NOT NULL AND optout IS NOT NULL INTO OUTFILE '/tmp/emailcampaign_dataset.csv'
  3. -> FIELDS TERMINATED BY ',' LINES TERMINATED BY '\r\n';
  4. Query OK, 8358444 ROWS affected (32.38 sec)

Note that I am only including the records where the camptype and optout variables have either a 0 or a 1 (not NULL). The resulting file looks like this:

  1. riid,event,event_captured_dt,cnts,aq_dt,sub_source,tenure,camptype,optout
  2. 91355862,SENT,2018-02-27 00:00:00,1,2011-02-21 00:00:00,POS form_Physical,2563,Heavy Promo - Dedicated Messaging,0
  3. 86264082,SENT,2018-02-15 00:00:00,1,2010-04-29 00:00:00,Other_Other,2849,Heavy Promo - Dedicated Messaging,0
  4. 111704342,SENT,2018-01-08 00:00:00,1,2012-07-28 00:00:00,POS form_Physical,1990,Heavy Promo - Secondary Messaging,0
  5. 94780402,SENT,2018-02-01 00:00:00,1,2015-03-31 00:00:00,Checkout_Online,1038,Heavy Promo,0
  6. 91122822,SENT,2018-01-31 00:00:00,1,2011-02-04 00:00:00,POS form_Physical,2553,Heavy Promo - Dedicated Messaging,0
  7. 115866522,SENT,2018-01-25 00:00:00,1,2012-10-18 00:00:00,Other_Other,1925,Light Promo,0
  8. 104867102,SENT,2018-02-12 00:00:00,1,2012-05-07 00:00:00,Other_Other,2107,Light Promo - Secondary Messaging,0
  9. 131624382,SENT,2018-02-08 00:00:00,1,2014-05-31 00:00:00,Checkout_Online,1349,No Promo,0
  10. 145310062,SENT,2018-01-23 00:00:00,1,2015-03-31 00:00:00,Checkout_Online,1029,Light Promo - Secondary Messaging,0
  11. 86463002,SENT,2018-01-18 00:00:00,1,2010-05-05 00:00:00,Other_Other,2815,Light Promo - Secondary Messaging,0
  12. ...
  13. ...

You can download the CSV file to look at it in its entirety.

Predicting likelihood that the user will open the email

With the dataset thus created, we are now ready to fit a machine learning model over it. We used to have 8.3 million records in this dataset, but after eliminating all those records that have either a NULL camptype or a NULL optout field, the resuting records amount to only 186,584 records. Our job is to make a prediction based on that. I would like to be able to fit a machine learning model using the entire dataset. For that I am going to use SPARK as my tool-set. Since the output is binary (i.e. 0 for open and 1 for optout) I am going to use LogisticRegression to fit a model.

You will find the entire code written in Java in GitHub. Clone this repository to follow along.

Prerequisites for running this program are Apache SPARK that you need to download and install on your computer first. I will be using Spark in standalone mode, so there should be no difficulty to run it. Make sure that 'spark-submit' is in your $PATH variable.

I have created the repository in such a way that the dataset is in the folder 'data' under root. After cloning the repository look at the Java file CustomerOpenResponsePredictor.java. Let us look at the main() function of this class to see the logic behind it.

  1. SparkSession spark = SparkSession
  2. .builder()
  3. .appName("CustomerOpenResponsePredictor")
  4. .getOrCreate();
  5.  
  6. // Load training data
  7. Dataset allrows = spark.read().format("csv")
  8. .option("header", "true")
  9. .option("inferSchema", "true")
  10. .load("data/emailcampaign_dataset.csv");
  11.  
  12. StringIndexer eventIndexer = new StringIndexer().setHandleInvalid("keep").setInputCol("event").setOutputCol("eventIndex");
  13. StringIndexer subsourceIndexer = new StringIndexer().setHandleInvalid("keep").setInputCol("sub_source").setOutputCol("sub_sourceIndex");
  14. StringIndexer camptypeIndexer = new StringIndexer().setHandleInvalid("keep").setInputCol("camptype").setOutputCol("camptypeIndex");
  15.  
  16. Dataset indexedRows1 = eventIndexer.fit(allrows).transform(allrows);
  17. Dataset indexedRows2 = subsourceIndexer.fit(indexedRows1).transform(indexedRows1);
  18. Dataset indexedRows = camptypeIndexer.fit(indexedRows2).transform(indexedRows2);
  19.  
  20. indexedRows.printSchema();
  21.  
  22. Dataset[] trainingAndTest = indexedRows.randomSplit(new double[] {0.8, 0.2});
  23.  
  24. Dataset trainingSet = trainingAndTest[0];
  25. Dataset testSet = trainingAndTest[1];

First task is the create the Spark session which happens in line 1. In lines 7-10 I am loading the data from the CSV session. (Note that since this is multi-threaded, the data will be read in parallel in chunks through different threads). This dataset has three fields that are given as strings, but they can be converted to categorical variables (i.e. they are basically enumerations of state). Spark offers a way to do this through the StringIndexer class. Lines 12 - 14 define three different indexers that convert the columns event, sub_source and camptype to eventIndex, sub_sourceIndex and camptypeIndex respectively. The remaining variables are all integers, so LogisticRegression will work fine in this case. Lines 16-18 do the actual conversion of these three columns to integers from String. The next step is to split the data randomly into two sets - one for training and another for validation. I am using the randomSplit() function to do that in lines 22 to 25. After doing this, I now have two datasets with a 80%-20% split.

Feature Set and Output Value

To be clear, here are all the parameters in the feature set (the input X vector):

  1. eventIndex - the enumerated version of the type of event
  2. cnts - the count of clicks on that event type
  3. sub_sourceIndex - the enumerated version of the source of email
  4. tenure - the amount of time the user has been a subscriber
  5. camptypeIndex - the enumerated version of the type of campaign

The output (y) is a binary value "optout" which represents whether the user will stay (represented by 0) or unsubscribe (represented by 1).

  1. VectorAssembler assembler = new VectorAssembler()
  2. .setInputCols(new String[]{"eventIndex", "cnts", "sub_sourceIndex", "tenure", "camptypeIndex"})
  3. .setOutputCol("features");
  4.  
  5.  
  6. LogisticRegression lr = new LogisticRegression()
  7. .setMaxIter(10)
  8. .setRegParam(0.3)
  9. .setElasticNetParam(0.8)
  10. .setFeaturesCol("features") // setting features column
  11. .setLabelCol("optout"); // setting label column
  12.  
  13. Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {assembler,lr});
  14.  
  15. PipelineModel lrModel = pipeline.fit(trainingSet);
  16.  
  17. Dataset predictions = lrModel.transform(testSet);
  18.  
  19. // [sub_source, event_captured_dt, aq_dt, sub_sourceIndex, rawPrediction, cnts, probability, camptypeIndex,
  20. // riid, tenure, event, camptype, prediction, eventIndex, optout, features]
  21. int correctCount = 0, allCount = 0;
  22. for (Row r : predictions.select("prediction","optout").collectAsList()) {
  23. if (Math.abs((r.getDouble(0) - (double)r.getInt(1))) < 0.001)
  24. correctCount++;
  25. allCount++;
  26. }
  27.  
  28. System.out.println("Percentage correct = " + 100*((double)correctCount/(double)allCount));
  29.  
  30. spark.stop();

Note that all the columns present in the dataset are not to be fed into the LogisticRegression model builder. Only certain meaningful columns must be selected for the model building. I am going to use the VectorAssembler class to create my feature set. Lines 1 - 3 creates a composite output column called "features" that comprises of the columns eventIndex, cnts, sub_sourceIndex, tenure and camptypeIndex. These are our input variables. The LogisticRegression model is built in line 6 with epochs as 10, regularization parameter as 0.3 and elastic network parameter as 0.8. The input feature set is the "features" column I just created and the output is the "optout" column. Lines 13 to 15 are the Spark steps to create a workflow pipeline. Line 15 creates the actual learning model from the dataset. Line 17 uses the test dataset to make a prediction based on this model. Lines 21 to 26 finds out the accuracy of the model using a simple comparison of the predicted value and the original value of optout.

One point to note about software versions: I am using Spark 2.3.1 and JDK 1.8 to do my work. This combination worked for me. (JDK 10 has some compatibility problems with Spark 2.3, so avoid this combination.)

To compile type the following:

  1. mvn clean package

To run the program type:

  1. bin/run-emailcampaign-learner.sh

Your output is going to look somewhat like this:

  1. bin/run-emailcampaign-learner.sh
  2. Running Email Campaign Learner using SPARK ...
  3. spark-submit --master local[*] --class com.anupambagchi.emailcampaign.CustomerOpenResponsePredictor /Users/anupambagchi/Developer/machine-learning-using-spark/bin/../target/spark.application-1.0-jar-with-dependencies.jar
  4. 18/09/03 20:43:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  5. root
  6. |-- riid: integer (nullable = true)
  7. |-- event: string (nullable = true)
  8. |-- event_captured_dt: timestamp (nullable = true)
  9. |-- cnts: integer (nullable = true)
  10. |-- aq_dt: timestamp (nullable = true)
  11. |-- sub_source: string (nullable = true)
  12. |-- tenure: integer (nullable = true)
  13. |-- camptype: string (nullable = true)
  14. |-- optout: integer (nullable = true)
  15. |-- eventIndex: double (nullable = false)
  16. |-- sub_sourceIndex: double (nullable = false)
  17. |-- camptypeIndex: double (nullable = false)
  18.  
  19. Percentage correct = 98.41830135805131

Since the model does the splitting between the training and test dataset randomly, the results will be different each time. For most of my runs, the accuracy of the model varies between 98.2 to 98.5 percent. This is pretty good given that we had to crunch so much data to come to this conclusion.

How to use this information?

Now the big question is, what can we do about it? Since we are able to predict quite accurately whether the user will unsubscribe, it is best not to send the email to that user. The next step in deployment of this model is to keep it running as a web-service, and before sending an email, check (through the web-service) if the user is likely to opt out of the emails. If so, it is best to suppress sending that email to the user. Awesome! So now we have found an innovative way to retain customers.

Conclusion

I am not going to disappoint any person who came to this page thinking that they will get nuggets of information and instead found a bunch of SQL and mathematics. Here are some tips for really sending email for your campaigns. I am reproducing below for your reference.

  • Try sending email on weekends: Based on data of over 9 billion emails (yes, billion with a “b”) provided by Mailchimp, Saturdays and Sundays have notably higher Click Thru Rates (CTRs); this is coupled with said data showing unsubscribes peak on Tuesdays. These rates include both B2B and B2C emails (although given the volume, are presumably skewed towards B2C.)

  • Send very early in the morning: Data layered over Hubspot survey results reveal while recipients report reading email throughout the day, CTRs peak between 6 – 7 am (localized). And while unsubscribes also peak in the morning, they also spiked late in the evening (when readership dropped off).

  • Optimize your email for mobile: a bit of a no-brainer; surveys cited 80% of the respondents read email on mobile devices, highlighting the importance of making sure your email doesn’t look like scrambled eggs on mobile.

  • Include reference data in your email: Make email searchable. Focus group participants report using email inboxes as an archive of ‘elite’, personal data, referring back to it on an informational basis.

  • Use lots of links in your email: While this may be counterintuitive, Zarrella says there is strong correlation between a greater number of links and higher CTRs. Data also shows lower unsubscribes as the number of links increase. (This may be, of course, because the unsubscribe link is tougher to find…)

  • Serialize and label your email: Using continuity and content-based words such as “[this] week’s,” “newsletter,” or “digest” in the subject line leads to the higher CTRs. Conversely, the traditional “spam” words continue to hold true. [Bonus hint: monitor your spam box for common “trigger” words to avoid.]

  • Give your subscribers special access: Focus groups find people like getting offers specific to them, offers with exclusivity built into them. Another no-brainer, but worth repeating.

  • Send email from people they’ve heard of: Be it a celebrity name or a guru, people like receiving emails from names they recognize.

  • Do not be afraid to send too much email: Unsubscribes are notably higher for organizations that send one or two emails per month; as the frequency of emails reaches eight the number declines.

  • Your newest subscribers are your best: While most subscribers opt out shortly after first subscribing to an email, CTRs early on are also at a high – proving the adage “get ‘em while they’re hot.”

  • Ask people to follow you, not forward emails: It’s not just using social media, but using it wisely. Survey data showed about 80% people either never or rarely forward or Tweet commercial email, even with the advent of ‘share’ and ‘tweet’ buttons. Instead, get people to follow you through Facebook, Twitter, etc. driving prospects to subscribe to your email.

  • Make them want to get your emails: 70% of people report reading most or all of their email, and 58% have separate “junk” inboxes. Given that, Zarella stressed incorporating all the best practice takeaways detailed to ensure your message gets to people’s “good” email address, read, and acted upon.

So that's it folks. You are now an Email marketing ninja. Go out and start practicing some of these ideas.

In the first and second part of this series I described how to set up the hardware to read data from the OBD port using a Raspberry Pi and then upload it to the cloud. You should read the first part and second part of this series before you read this article.

Having done all the work to capture and transport all the data to the cloud, let us figure out what can be done on the cloud to introduce Machine Learning. To understand the concepts given in this article you will need to be familiar with Javascript and Python. Also, I am using MongoDB as my database - so you will need to know the basics of a document-oriented database to follow the example code here. MongoDB is not only my storage engine here, but also my compute engine. By that, I mean that I am using the database's scripting language (Javascript) to pre-process data that will ultimately be used for machine learning. The Javascript code given herein executes (in a distributed manner) inside the MongoDB database. (Some people get confused when they see Javascript, assuming that it requires a server like NodeJS to run - not here.)

Introduction to the Solution

To set the introduction, I will describe the following three tasks in this article:

  1. Read the raw records and augment it with additional derived information to generate some extra features used for machine learning that is not directly sent by the Raspberry Pi. Derivatives based on time interval between readings can be used to derive instanteous velocity, angular velocity and incline. These are inserted into the derived records when we save it to another MongoDB collection (also known as 'table' in relational database parlance) to be used later for generating the feature sets. I will be calling this collection 'mldataset' in my database.
  2. Read the 'mldataset' and extract features from the records. The feature set is saved into another collection called 'vehicle_signature_records'. This is an involved process since there are so many fields found in the raw records. In my case, the feature sets are basically three statistical averages (minimum, average and maximum) of all values aggregated over a 15 second period. The other research papers on this subject take the same approach, but the time interval over which the aggregates are taken vary based on the frequency of the readings. The recommended frequency is 5 Hz i.e. 1 record-set per 0.2 second. But as I mentioned in article 2 of this series, we are unable to read data that fast on a serial connection over ELM 327. The maximum speed that I have been able to observe (mostly in modern cars) is 1 record-set in 0.7 seconds. Thus a 15 second aggregation makes more sense in our scenario. Due to this, the accuracy of the prediction may be affected - but we will accept that as a constraint. The solution methodology remains the same though.
  3. Apply a learning algorithm on the feature-set to learn the driver behavior. Then the model needs to be deployed on a machine in the cloud. In real-time we need to calculate the same aggregates over the same time interval (15 seconds) and feed it into the model to come up with a prediction. To confirm the driver we will need to take readings over several intervals (5 minutes will give 20 predictions) and then use the value with the maximum count (i.e. modal value).

Augmenting raw data with derivatives

This is a very common scenario in IoT applications. When generated data comes from a human being, it always has useful information at the surface. All you need to do is scan the logs and extract it. An example of this is finding the interests of the user based on user-clicks in a shopping cart scenario - all the items that the user has seen on the web-site are directly found in the logs. However an IoT device is dumb, has no emotion, has no special interests. All data coming from an IoT device is the same consistent boring stream. So where do you dig to find useful information? The answer to this question is in the time-derivatives. The variation in values from one reading and the other provides useful insight into the behavior. Examples of these are velocity (derivative of displacement found from GPS readings), acceleration (derivative of velocity) and jerk (derivative of acceleration). So you see, augmenting raw data to put this extra information is extremely useful for IoT applications.

In this step I am going to write some Javascript code (that runs inside the MongoDB database) to augment raw data with derivatives for each record. You will find all this code in the file 'extract_driver_features_from_car_readings.js' which is located inside the 'machinelearning' folder. If you are wondering where to find the code, it is in Github at this location https://github.com/anupambagchi/driver-signature-raspberry-pi.

Processing raw records

Before diving into the code, let me clarify a few things. The code is written to run as a cronjob on a machine on the same network as the MongoDB database - so that it is accessible. Since it runs as a cron task, we need to know how many records to process from the raw data table. Thus we need to do some book-keeping on the database. We have a special collection called 'book_keeping' for this purpose where we store some book-keeping information. One of them is the last date till when we have processed the records. The data (in JSON format) may look like this:

  1. {
  2. "_id" : "processed_until",
  3. "lastEndTime" : ISODate("2017-12-08T23:56:56.724+0000")
  4. }

To determine where we need to pick up the record processing from, here is one way to do this in a MongoDB script written in Javascript.

  1. // Look up the book-keeping table to figure out the time from where we need to pick up
  2. var processedUntil = db.getCollection('book_keeping').findOne( { _id: "processed_until" } );
  3. endTime = new Date(); // Set the end time to the current time
  4. if (processedUntil != null) {
  5. startTime = processedUntil.lastEndTime;
  6. } else {
  7. db.book_keeping.insert( { _id: "processed_until", lastEndTime: endTime } );
  8. startTime = new Date(endTime.getTime() - (365*86400000)); // Go back 365 days
  9. }

The 'else' part of the logic above is for the initialization phase when we run it for the first time - we just want to pick up all records for the past year.

Keeping track of driver vehicle combination

Another book-keeping task is to keep track of the driver-vehicle combinations. To make the job easier for the machine learning algorithm, this should be converted to indices. Those indices are maintained in the database in another book-keeping collection called 'driver_vehicles'. This collection looks somewhat like this:

  1. {
  2. "_id" : "driver_vehicles",
  3. "drivers" : {
  4. "gmc-denali-2015_jonathan" : 0.0,
  5. "gmc-denali-2015_mindy" : 1.0,
  6. "gmc-denali-2015_charles" : 2.0,
  7. "gmc-denali-2015_chris" : 3.0,
  8. "gmc-denali-2015_elise" : 4.0,
  9. "gmc-denali-2015_thomas" : 5.0,
  10. "toyota-camry-2009_alice" : 6.0,
  11. "gmc-denali-2015_andrew" : 7.0,
  12. "toyota-highlander-2005_arka" : 8.0,
  13. "subaru-outback-2015_john" : 9.0,
  14. "gmc-denali-2015_grant" : 10.0,
  15. "gmc-denali-2015_avni" : 11.0,
  16. "toyota-highlander-2005_anupam" : 12.0,
  17. }
  18. }

These are the names of vehicles with their drivers. Each combination has been assigned a number against it. When a driver-vehicle combination is encountered, the program looks to see if that combination already exists or not. If not, then it adds a new combination. Here is the code to do it.

  1. // Another book-keeping task is to read the driver-vehicle hash-table from the database
  2. // Look up the book-keeping table to figure out the previous driver-vehicle codes (we have
  3. // numbers representing the combination of drivers and vehicles).
  4. var driverVehicles = db.getCollection('book_keeping').findOne( { _id: "driver_vehicles" } );
  5. var drivers;
  6. if (driverVehicles != null)
  7. drivers = driverVehicles.drivers;
  8. else
  9. drivers = {};
  10.  
  11. var maxDriverVehicleId = 0;
  12. for (var key in drivers) {
  13. if (drivers.hasOwnProperty(key)) {
  14. maxDriverVehicleId = Math.max(maxDriverVehicleId, drivers[key]);
  15. }
  16. }

You can see that a 'find' call to the MongoDb database is being made to read the hash-table in memory.

The next task is to query the raw collection to find out which records are new since the last time it ran.

NOTE:  In the code segments below the dollar symbol will show up as '@'. Please make the appropriate substitutions when you read it. The correct code may be found in the github repository.

  1. // Now do a query of the database to find out what records are new since we ran it last
  2. var allNewCarDrivers = db.getCollection('car_readings').aggregate([
  3. {
  4. "@match": {
  5. "timestamp" : { @gt: startTimeStr, @lte: endTimeStr }
  6. }
  7. },
  8. {
  9. "@unwind": "@data"
  10. },
  11. {
  12. "@group": { _id: "@data.username" }
  13. }
  14. ]);

SQL vs. Document-oriented database

The next part is the crux of the actual process happening in this script. To understand how this works you need to be familiar with the MongoDB aggregation framework. A task like this will take way too much code if you start writing SQL. Most relational databases and also Spark offer SQL as a way to process and aggregate their data. The most common reason I have heard from managers to take that approach is - "it is easy". That works, but it is too verbose. That is why I personally prefer to use the aggregation framework of MongoDB to do my pre-processing since I can operate much faster than the other tools out there. It may not be "easy" as per the common belief, but a bit more effort in studying the aggregation framework pays off - saving a lot of development effort.

What about execution time? These scripts execute on the database nodes - inside the database. Thus you cannot make it any faster - since most of the time spent in dealing with large data is in transporting the data from the storage nodes to the execution nodes. In the case of the aggregation frameworks, you are getting all benefits of BigData for free. You are actually using in-database analytics here for the fastest execution time.

  1. // Look at all the records returned and process each driver one-by-one
  2. // The following query is a pipeline with the following steps:
  3. allNewCarDrivers.forEach(function(driverId) {
  4. var driverName = driverId._id;
  5. print("Processing driver: " + driverName);
  6. var allNewCarReadings = db.getCollection('car_readings').aggregate([
  7. {
  8. "@match": { // 1. Match all records that fall within the time range we have decided to use. Note that this is being
  9. // done on a live database - which means that new data is coming in while we are trying to analyze it.
  10. // Thus we have to pin both the starting time and the ending time. Pinning the endtime to the starting time
  11. // of the application ensures that we will be accurately picking up only the NEW records when the program
  12. // runs again the next time.
  13. "timestamp" : { @gt: startTimeStr, @lte: endTimeStr }
  14. }
  15. },
  16. {
  17. @project: { // We only need to consider a few fields for our analysis. This eliminates the summaries from our analysis.
  18. "timestamp": 1,
  19. "data" : 1,
  20. "account": 1,
  21. "_id": 0
  22. }
  23. },
  24. {
  25. @unwind: "@data" // Flatten out all records into one gigantic array of records
  26. },
  27. {
  28. @match: {
  29. "data.username": driverName // Only consider the records for this specific driver, ignoring all others.
  30. }
  31. },
  32. {
  33. @sort: {
  34. "data.eventtime": 1 // Finally sort the data based on eventtime in ascending order
  35. }
  36. }
  37. ]);

This nifty script above does a lot of things. The first thing to note in this script is that we are operating on a live database that has a constant stream of data coming in. Thus in order to select some records for processing we need to decide the time range first and only select those that fall within that time range. The next time we run this script, the records that could not be picked up this time, will be gathered and processed. This is all being done within the 'match' clause.

The second clause is the 'project' clause - which only selects the four required fields for the next stage of the pipeline. The 'unwind' clause flattens all arrays. The next 'match' clause select the driver name and the final 'sort' clause sorts the data by eventtime in ascending order.

Distance on earth between two points

Before proceeding further, I would like to get one thing out of the way. Since we are dealing with a lot of latitude-longitude pairs and subsequently trying to find displacement, velocity and acceleration, we need a way to calculate the distance between two points on earth. There are several algorithms with varying degree of accuracy, but this is the one I have found to be computationally accurate (if you do not have an algorithm already provided by the database vendor).

  1. function earth_distance_havesine(lat1, lon1, lat2, lon2, unit) {
  2. var radius = 3959; // miles
  3. var phi1 = lat1.toRadians();
  4. var phi2 = lat2.toRadians();
  5. var delphi = (lat2-lat1).toRadians();
  6. var dellambda = (lon2-lon1).toRadians();
  7.  
  8. var a = Math.sin(delphi/2) * Math.sin(delphi/2) +
  9. Math.cos(phi1) * Math.cos(phi2) *
  10. Math.sin(dellambda/2) * Math.sin(dellambda/2);
  11. var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
  12.  
  13. var dist = radius * c;
  14. if (unit=="K") { dist = dist * 1.609344 }
  15. if (unit=="N") { dist = dist * 0.8684 }
  16. return dist;
  17. }

We will be using this function in the next analysis. As I said before, our goal is to augment our device records with extra information pertaining to time-derivatives. The following code adds extra fields "interval", "acceleration", "angular_velocity" and "incline" to each device record by comparing it with the preceeding record.

  1. var lastRecord = null; // We create a variable to remember what was the last record processed
  2.  
  3. var numProcessedRecords = 0;
  4. allNewCarReadings.forEach(function(record) {
  5. // Here we are reading a raw record from the car_readings collection, and then enhancing it with a few more
  6. // variables. These are the (1) id of the driver-vehicle combination and (2) the delta values between current and previous record
  7. numProcessedRecords += 1; // This is just for printing number of processed records when the program is running
  8. var lastTime; // This is the timestamp of the last record
  9. if (lastRecord !== null) {
  10. lastTime = lastRecord.data.eventtime;
  11. } else {
  12. lastTime = "";
  13. }
  14. var eventTime = record.data.eventtime;
  15. record.data.eventTimestamp = new Date(record.data.eventtime+'Z'); // Creating a real timestamp from an ISO string (without the trailing 'Z')
  16. // print('Eventtime = ' + eventTime);
  17. if (eventTime !== lastTime) { // this must be a new record
  18. var driverVehicle = record.data.vehicle + "_" + record.data.username;
  19. if (drivers.hasOwnProperty(driverVehicle))
  20. record.driverVehicleId = drivers[driverVehicle];
  21. else {
  22. drivers[driverVehicle] = maxDriverVehicleId;
  23. record.driverVehicleId = maxDriverVehicleId;
  24. maxDriverVehicleId += 1;
  25. }
  26.  
  27. record.delta = {}; // delta stores the difference between the current record and the previous record
  28. if (lastRecord !== null) {
  29. var timeDifference = record.data.eventTimestamp.getTime() - lastRecord.data.eventTimestamp.getTime(); // in milliseconds
  30. record.delta["distance"] = earth_distance_havesine(
  31. record.data.location.coordinates[1],
  32. record.data.location.coordinates[0],
  33. lastRecord.data.location.coordinates[1],
  34. lastRecord.data.location.coordinates[0],
  35. "K");
  36. if (timeDifference < 60000) {
  37. // if time difference is less than 60 seconds, only then can we consider it as part of the same session
  38. // print(JSON.stringify(lastRecord.data));
  39. record.delta["interval"] = timeDifference;
  40. record.delta["acceleration"] = 1000 * (record.data.speed - lastRecord.data.speed) / timeDifference;
  41. record.delta["angular_velocity"] = (record.data.heading - lastRecord.data.heading) / timeDifference;
  42. record.delta["incline"] = (record.data.altitude - lastRecord.data.altitude) / timeDifference;
  43. } else {
  44. // otherwise this is a new session. So we still store the records, but the delta calculation is all set to zero.
  45. record.delta["interval"] = timeDifference;
  46. record.delta["acceleration"] = 0.0;
  47. record.delta["angular_velocity"] = 0.0;
  48. record.delta["incline"] = 0.0;
  49. }
  50. db.getCollection('mldataset').insert(record);
  51. }
  52. }
  53. if (numProcessedRecords % 100 === 0)
  54. print("Processed " + numProcessedRecords + " records");
  55. lastRecord = record;
  56. });
  57. });
  58.  

Note that in line 50, I am saving the record in another collection called 'mldataset' which is going to be the collection on which I will apply feature-extraction for driver signatures. The final task is to save the book-keeping values in their respective tables.

  1. db.book_keeping.update(
  2. { _id: "driver_vehicles"},
  3. { \(set: { drivers: drivers } },
  4. { upsert: true }
  5. );
  6.  
  7. // Save the end time to the database
  8. db.book_keeping.update(
  9. { _id: "processed_until" },
  10. { \)set: { lastEndTime: endTime } },
  11. { upsert: true }
  12. );
  13.  

Creating the feature set for driver signatures

The next step is to create the feature sets for driver signature analysis. I do this by first reading records from the augmented collection 'mldataset' and aggregating values over every 15 minutes. For each field that contains a number (and it happens to change often), I will calculate three statistical values for each field - the minimum over the time window, the maximum and the average. Interestingly, one can also include other statistical values like variance, kertosis - but I have not tried those in my experiment yet - and is an enhancement that you can do easily.

You will find all the code in the file 'extract_features_from_mldataset.js' under the 'machinelearning' directory.

Let us do some book-keeping first.

  1. var processedUntil = db.getCollection('book_keeping').findOne( { _id: "driver_vehicles_processed_until" } );
  2. var currentTime = new Date(); // This time includes the seconds value
  3. // Set the end time (for querying) to the current time till the last whole minute, excluding seconds
  4. var endTimeGlobal = new Date(Date.UTC(currentTime.getFullYear(), currentTime.getMonth(), currentTime.getDate(), currentTime.getHours(), currentTime.getMinutes(), 0, 0))
  5.  
  6. if (processedUntil === null) {
  7. db.book_keeping.insert( { _id: "driver_vehicles_processed_until", lastEndTimes: [] } ); // initialize to an empty array
  8. }
  9.  
  10. // Now do a query of the database to find out what records are new since we ran it last
  11. var startTimeForSearchingActiveDevices = new Date(endTimeGlobal.getTime() - (200*86400000)); // Go back 200 days
  12.  
  13. // Another book-keeping task is to read the driver-vehicle hash-table from the database.
  14. // Look up the book-keeping table to figure out the previous driver-vehicle codes (we have
  15. // numbers representing the combination of drivers and vehicles).
  16. var driverVehicles = db.getCollection('book_keeping').findOne( { _id: "driver_vehicles" } );
  17. var drivers;
  18. if (driverVehicles !== null)
  19. drivers = driverVehicles.drivers;
  20. else
  21. drivers = {};
  22.  
  23. var maxDriverVehicleId = 0;
  24. for (var key in drivers) {
  25. if (drivers.hasOwnProperty(key)) {
  26. maxDriverVehicleId = Math.max(maxDriverVehicleId, drivers[key]);
  27. }
  28. }
  29.  

Using the last time stamp stored in the system, we can figure out which records are new.

  1. // Now do a query of the database to find out what records are new since we ran it last
  2. var allNewCarDrivers = db.getCollection('mldataset').aggregate([
  3. {
  4. "@match": {
  5. "data.eventTimestamp" : { @gt: startTimeForSearchingActiveDevices, @lte: endTimeGlobal }
  6. }
  7. },
  8. {
  9. "@group": { _id: "@data.username" }
  10. }
  11. ]);
  12.  

Extracting features for each driver

Now is the time to do the actual feature extraction from the data-set. Here is the entire loop:

  1. allNewCarDrivers.forEach(function(driverId) {
  2. var driverName = driverId._id;
  3. print("Processing driver: " + driverName);
  4. var startTimeForDriver = startTimeForSearchingActiveDevices; // To begin with we start with the earliest start time we care about
  5.  
  6. var driverIsNew = true;
  7. // First find out if this device already has some records processed, and has a last end time defined
  8. var lastEndTimeDevice = db.getCollection('book_keeping').find(
  9. {
  10. _id: "driver_vehicles_processed_until",
  11. "lastEndTimes.driver": driverName
  12. },
  13. {
  14. _id: 0,
  15. 'lastEndTimes.@': 1
  16. }
  17. );
  18.  
  19. lastEndTimeDevice.forEach(function(record) {
  20. startTimeForDriver = record.lastEndTimes[0].endTime;
  21. driverIsNew = false;
  22. });
  23.  
  24. //print('Starting time for driver is ' + startTimeForDriver.toISOString());
  25. //print('endTimeGlobal = ' + endTimeGlobal.toISOString());
  26.  
  27. var allNewCarReadings = db.getCollection('mldataset').aggregate([
  28. {
  29. "@match": { // 1. Match all records that fall within the time range we have decided to use. Note that this is being
  30. // done on a live database - which means that new data is coming in while we are trying to analyze it.
  31. // Thus we have to pin both the starting time and the ending time. Pinning the endtime to the starting time
  32. // of the application ensures that we will be accurately picking up only the NEW records when the program
  33. // runs again the next time.
  34. "data.eventTimestamp": {@gt: startTimeForDriver, @lte: endTimeGlobal},
  35. "data.username": driverName // Only consider the records for this specific driver, ignoring all others.
  36. }
  37. },
  38. {
  39. @project: { // We only need to consider a few fields for our analysis. This eliminates the summaries from our analysis.
  40. "data": 1,
  41. "account": 1,
  42. "delta": 1,
  43. "driverVehicleId": 1,
  44. "_id": 0
  45. }
  46. },
  47. {
  48. "@group": {
  49. "_id": {
  50. year: {@year: "@data.eventTimestamp"},
  51. month: {@month: "@data.eventTimestamp"},
  52. day: {@dayOfMonth: "@data.eventTimestamp"},
  53. hour: {@hour: "@data.eventTimestamp"},
  54. minute: {@minute: "@data.eventTimestamp"},
  55. quarter: {@mod: [{@second: "@data.eventTimestamp"}, 4]}
  56. },
  57.  
  58. "averageGPSLatitude": {@avg: {"@arrayElemAt": ["@data.location.coordinates", 1]}},
  59. "averageGPSLongitude": {@avg: {"@arrayElemAt": ["@data.location.coordinates", 0]}},
  60.  
  61. "averageLoad": {@avg: "@data.load"},
  62. "minLoad": {@min: "@data.load"},
  63. "maxLoad": {@max: "@data.load"},
  64.  
  65. "averageThrottlePosB": {@avg: "@data.abs_throttle_pos_b"},
  66. "minThrottlePosB": {@min: "@data.abs_throttle_pos_b"},
  67. "maxThrottlePosB": {@max: "@data.abs_throttle_pos_b"},
  68.  
  69. "averageRpm": {@avg: "@data.rpm"},
  70. "minRpm": {@min: "@data.rpm"},
  71. "maxRpm": {@max: "@data.rpm"},
  72.  
  73. "averageThrottlePos": {@avg: "@data.throttle_pos"},
  74. "minThrottlePos": {@min: "@data.throttle_pos"},
  75. "maxThrottlePos": {@max: "@data.throttle_pos"},
  76.  
  77. "averageIntakeAirTemp": {@avg: "@data.intake_air_temp"},
  78. "minIntakeAirTemp": {@min: "@data.intake_air_temp"},
  79. "maxIntakeAirTemp": {@max: "@data.intake_air_temp"},
  80.  
  81. "averageSpeed": {@avg: "@data.speed"},
  82. "minSpeed": {@min: "@data.speed"},
  83. "maxSpeed": {@max: "@data.speed"},
  84.  
  85. "averageAltitude": {@avg: "@data.altitude"},
  86. "minAltitude": {@min: "@data.altitude"},
  87. "maxAltitude": {@max: "@data.altitude"},
  88.  
  89. "averageCommThrottleAc": {@avg: "@data.comm_throttle_ac"},
  90. "minCommThrottleAc": {@min: "@data.comm_throttle_ac"},
  91. "maxCommThrottleAc": {@max: "@data.comm_throttle_ac"},
  92.  
  93. "averageEngineTime": {@avg: "@data.engine_time"},
  94. "minEngineTime": {@min: "@data.engine_time"},
  95. "maxEngineTime": {@max: "@data.engine_time"},
  96.  
  97. "averageAbsLoad": {@avg: "@data.abs_load"},
  98. "minAbsLoad": {@min: "@data.abs_load"},
  99. "maxAbsLoad": {@max: "@data.abs_load"},
  100.  
  101. "averageGear": {@avg: "@data.gear"},
  102. "minGear": {@min: "@data.gear"},
  103. "maxGear": {@max: "@data.gear"},
  104.  
  105. "averageRelThrottlePos": {@avg: "@data.rel_throttle_pos"},
  106. "minRelThrottlePos": {@min: "@data.rel_throttle_pos"},
  107. "maxRelThrottlePos": {@max: "@data.rel_throttle_pos"},
  108.  
  109. "averageAccPedalPosE": {@avg: "@data.acc_pedal_pos_e"},
  110. "minAccPedalPosE": {@min: "@data.acc_pedal_pos_e"},
  111. "maxAccPedalPosE": {@max: "@data.acc_pedal_pos_e"},
  112.  
  113. "averageAccPedalPosD": {@avg: "@data.acc_pedal_pos_d"},
  114. "minAccPedalPosD": {@min: "@data.acc_pedal_pos_d"},
  115. "maxAccPedalPosD": {@max: "@data.acc_pedal_pos_d"},
  116.  
  117. "averageGpsSpeed": {@avg: "@data.gps_speed"},
  118. "minGpsSpeed": {@min: "@data.gps_speed"},
  119. "maxGpsSpeed": {@max: "@data.gps_speed"},
  120.  
  121. "averageShortTermFuelTrim2": {@avg: "@data.short_term_fuel_trim_2"},
  122. "minShortTermFuelTrim2": {@min: "@data.short_term_fuel_trim_2"},
  123. "maxShortTermFuelTrim2": {@max: "@data.short_term_fuel_trim_2"},
  124.  
  125. "averageO211": {@avg: "@data.o211"},
  126. "minO211": {@min: "@data.o211"},
  127. "maxO211": {@max: "@data.o211"},
  128.  
  129. "averageO212": {@avg: "@data.o212"},
  130. "minO212": {@min: "@data.o212"},
  131. "maxO212": {@max: "@data.o212"},
  132.  
  133. "averageShortTermFuelTrim1": {@avg: "@data.short_term_fuel_trim_1"},
  134. "minShortTermFuelTrim1": {@min: "@data.short_term_fuel_trim_1"},
  135. "maxShortTermFuelTrim1": {@max: "@data.short_term_fuel_trim_1"},
  136.  
  137. "averageMaf": {@avg: "@data.maf"},
  138. "minMaf": {@min: "@data.maf"},
  139. "maxMaf": {@max: "@data.maf"},
  140.  
  141. "averageTimingAdvance": {@avg: "@data.timing_advance"},
  142. "minTimingAdvance": {@min: "@data.timing_advance"},
  143. "maxTimingAdvance": {@max: "@data.timing_advance"},
  144.  
  145. "averageClimb": {@avg: "@data.climb"},
  146. "minClimb": {@min: "@data.climb"},
  147. "maxClimb": {@max: "@data.climb"},
  148.  
  149. "averageFuelPressure": {@avg: "@data.fuel_pressure"},
  150. "minFuelPressure": {@min: "@data.fuel_pressure"},
  151. "maxFuelPressure": {@max: "@data.fuel_pressure"},
  152.  
  153. "averageTemp": {@avg: "@data.temp"},
  154. "minTemp": {@min: "@data.temp"},
  155. "maxTemp": {@max: "@data.temp"},
  156.  
  157. "averageAmbientAirTemp": {@avg: "@data.ambient_air_temp"},
  158. "minAmbientAirTemp": {@min: "@data.ambient_air_temp"},
  159. "maxAmbientAirTemp": {@max: "@data.ambient_air_temp"},
  160.  
  161. "averageManifoldPressure": {@avg: "@data.manifold_pressure"},
  162. "minManifoldPressure": {@min: "@data.manifold_pressure"},
  163. "maxManifoldPressure": {@max: "@data.manifold_pressure"},
  164.  
  165. "averageLongTermFuelTrim1": {@avg: "@data.long_term_fuel_trim_1"},
  166. "minLongTermFuelTrim1": {@min: "@data.long_term_fuel_trim_1"},
  167. "maxLongTermFuelTrim1": {@max: "@data.long_term_fuel_trim_1"},
  168.  
  169. "averageLongTermFuelTrim2": {@avg: "@data.long_term_fuel_trim_2"},
  170. "minLongTermFuelTrim2": {@min: "@data.long_term_fuel_trim_2"},
  171. "maxLongTermFuelTrim2": {@max: "@data.long_term_fuel_trim_2"},
  172.  
  173. "averageGPSAcceleration": {@avg: "@delta.acceleration"},
  174. "minGPSAcceleration": {@min: "@delta.acceleration"},
  175. "maxGPSAcceleration": {@max: "@delta.acceleration"},
  176.  
  177. "averageHeadingChange": {@avg: {@abs: "@delta.angular_velocity"}},
  178. "minHeadingChange": {@min: {@abs: "@delta.angular_velocity"}},
  179. "maxHeadingChange": {@max: {@abs: "@delta.angular_velocity"}},
  180.  
  181. "averageIncline": {@avg: "@data.incline"},
  182. "minIncline": {@min: "@data.incline"},
  183. "maxIncline": {@max: "@data.incline"},
  184.  
  185. "averageAcceleration": {@avg: "@delta.acceleration"},
  186. "minAcceleration": {@min: "@delta.acceleration"},
  187. "maxAcceleration": {@max: "@delta.acceleration"},
  188.  
  189. // "dtcCodes": {"@push": "@data.dtc_status"},
  190. "accountIdArray": {@addToSet: "@account"},
  191.  
  192. "vehicleArray": {@addToSet: "@data.vehicle"},
  193. "driverArray": {@addToSet: "@data.username"},
  194. "driverVehicleArray": {@addToSet: "@driverVehicleId"},
  195.  
  196. "count": {@sum: 1}
  197. }
  198. },
  199. {
  200. @sort: {
  201. "_id": 1 // Finally sort the data based on eventtime in ascending order
  202. }
  203. }
  204. ],
  205. {
  206. allowDiskUse: true
  207. }
  208. );
  209.  

For each driver (or rather driver-vehicle combination) that is identified, the first task is to figure out the last processing time for that driver and find all new records (lines 6 to 22). The next task of aggregating over 15 second windows is a MongoDB aggregation step starting from line 27. Aggregation tasks in MongoDB are described as pipeline where element element of the flow does a certain task and passes on the result to the next element in the pipe. The first task is to match all records within the time-span that we want to process (lines 29 to 36). Then we only need to consider (i.e. project) few fields that are of interest to us (lines 38 to 44). The element of the pipeline  '\(group') does the actual job of aggregation. The key to this aggregation step is the group-by Id that is created using a 'quarter' (line 55) which is nothing but a number between 0 and 3 created out of the second value of the time-stamp. This effectively creates the time windows needed for aggregation.

The actual aggregation steps are quite repetitive. See for example lines 61 to 63 where the average load, minimum load and maximum load is being calculated based on the aggregate over each time period. This is repeated for all the variables that we want to consider in the feature-set. Before storing it, the values are sorted based on event-time (lines 200 to 202).

Saving the feature-set in a collection

The features thus calculated are saved to a new collection on which I would apply a machine-learning algorithm to create a model. The collection is called 'vehicle_signature_records' - where the feature-set records can be saved as follows:

  1. var lastRecordedTimeForDriver = startTimeForDriver;
  2. var insertCounter = 0;
  3. allNewCarReadings.forEach(function (record) {
  4. var currentRecordEventTime = new Date(Date.UTC(record._id.year, record._id.month - 1, record._id.day, record._id.hour, record._id.minute, record._id.quarter * 15, 0));
  5. if (currentRecordEventTime >= lastRecordedTimeForDriver)
  6. lastRecordedTimeForDriver = new Date(Date.UTC(record._id.year, record._id.month - 1, record._id.day, record._id.hour, record._id.minute, 59, 999));
  7.  
  8. record['eventTime'] = currentRecordEventTime;
  9. record['eventId'] = record._id;
  10. delete record._id;
  11. record['accountId'] = record.accountIdArray[0];
  12. delete record.accountIdArray;
  13.  
  14. record['vehicle'] = record.vehicleArray[0];
  15. delete record.vehicleArray;
  16.  
  17. record['driver'] = record.driverArray[0];
  18. delete record.driverArray;
  19.  
  20. record['driverVehicle'] = record.driverVehicleArray[0];
  21. delete record.driverVehicleArray;
  22.  
  23. record.averageGPSLatitude = parseInt((record.averageGPSLatitude * 1000).toFixed(3)) / 1000;
  24. record.averageGPSLongitude = parseInt((record.averageGPSLongitude * 1000).toFixed(3)) / 1000;
  25.  
  26. db.getCollection('vehicle_signature_records').insert(record);
  27. insertCounter += 1;
  28. });

The code above inserts a few more variables to identify the driver, the vehicle and the driver-vehicle combination to the result sent by the aggregation function (lines 8 to 21) and saves it to the database (line 26). However lines 23 and 24 need an explanation since it signifies something very important and significant!

Coding the approximate location of the driver

One of the interesting observations I discovered while working on this problem is that one can dramatically improve accuracy of prediction if you can code the approximate location of the driver. Imagine working on this problem for millions of drivers who are scattered all across the country. One of the important facts to consider is that most drivers generally drive around a certain location most of the time. Thus if their location is somehow encoded into the model, the model can quickly converge based on their location. Lines 23 and 24 attempt to do just that. It encodes two numbers that represent the approximate latitude and longitude of the location. All these lines do is store the latitude and longitude with reduced accuracy.

Some more book-keeping

As a final step the final task is to store the book-keeping values.

  1. if (driverIsNew) { // which means this is a new device with no record
  2. db.book_keeping.update(
  3. {_id: 'driver_vehicles_processed_until'},
  4. {@push: {'lastEndTimes': {driver: driverName, endTime: lastRecordedTimeForDriver}}}
  5. );
  6. } else {
  7. var nowDate = new Date();
  8. db.book_keeping.update(
  9. {_id: 'driver_vehicles_processed_until', 'lastEndTimes.driver': driverName},
  10. {@set: {'lastEndTimes.@.endTime': lastRecordedTimeForDriver, 'lastEndTimes.@.driver': driverName}} // lastRecordedTimeForDriver
  11. );
  12. }
  13.  

After doing all this work (which by now you may be already exhausted after reading through), we are finally ready to apply some real machine-learning algorithms. Remember, I said before that 95% of the task of a data scientist is in preparing, collecting, consolidating and cleaning the data. You are seeing a live example of that!

In big companies there are people called data-engineers who would do part of this job, but not all people are fortunate enough to have data-engineers working for them. Besides, if you can do all this work, you are more indispensible to the company you work for - and so it makes sense to develop these skills along with your analysis skills as a data-scientist.

Building a Machine Learning Model

Fortunately, the data has been created in a clean way, so there is no further clean-up required on it. Our data is in a MongoDB collection called 'vehicle_signature_records'. If you are a pure Data Scientist the following should be very familar to you. The only difference between what I am going to do now and what you generally find in books and blogs, is the data-source. I am going to read my data-sets directly from the MongoDB database instead of from CSV files. After reading the above, by now you must have become partial experts at understanding MongoDB document structures. If not, don't worry since all the data that we stored in the collection are all flat - i.e. all values are present at the top level of each record. To illlustrate how the data looks, let me show you one record from the collection.

  1. {
  2. "_id" : ObjectId("5a3028db7984b918e715c2a7"),
  3. "averageGPSLatitude" : 37.386,
  4. "averageGPSLongitude" : -121.96,
  5. "averageLoad" : 24.80392156862745,
  6. "minLoad" : 0.0,
  7. "maxLoad" : 68.62745098039215,
  8. "averageThrottlePosB" : 29.11764705882353,
  9. "minThrottlePosB" : 14.901960784313726,
  10. "maxThrottlePosB" : 38.03921568627451,
  11. "averageRpm" : 1216.25,
  12. "minRpm" : 516.0,
  13. "maxRpm" : 1486.0,
  14. "averageThrottlePos" : 20.49019607843137,
  15. "minThrottlePos" : 11.764705882352942,
  16. "maxThrottlePos" : 36.86274509803921,
  17. "averageIntakeAirTemp" : 85.5,
  18. "minIntakeAirTemp" : 84.0,
  19. "maxIntakeAirTemp" : 86.0,
  20. "averageSpeed" : 13.517712865133625,
  21. "minSpeed" : 0.0,
  22. "maxSpeed" : 24.238657551274084,
  23. "averageAltitude" : -1.575,
  24. "minAltitude" : -1.9,
  25. "maxAltitude" : -1.2,
  26. "averageCommThrottleAc" : 25.392156862745097,
  27. "minCommThrottleAc" : 6.2745098039215685,
  28. "maxCommThrottleAc" : 38.431372549019606,
  29. "averageEngineTime" : 32.25,
  30. "minEngineTime" : 32.0,
  31. "maxEngineTime" : 33.0,
  32. "averageAbsLoad" : 40.3921568627451,
  33. "minAbsLoad" : 18.431372549019606,
  34. "maxAbsLoad" : 64.31372549019608,
  35. "averageGear" : 0.0,
  36. "minGear" : 0.0,
  37. "maxGear" : 0.0,
  38. "averageRelThrottlePos" : 19.019607843137255,
  39. "minRelThrottlePos" : 4.705882352941177,
  40. "maxRelThrottlePos" : 27.84313725490196,
  41. "averageAccPedalPosE" : 14.607843137254902,
  42. "minAccPedalPosE" : 9.411764705882353,
  43. "maxAccPedalPosE" : 19.215686274509803,
  44. "averageAccPedalPosD" : 30.19607843137255,
  45. "minAccPedalPosD" : 18.823529411764707,
  46. "maxAccPedalPosD" : 39.21568627450981,
  47. "averageGpsSpeed" : 6.720000000000001,
  48. "minGpsSpeed" : 0.0,
  49. "maxGpsSpeed" : 12.82,
  50. "averageShortTermFuelTrim2" : -0.5,
  51. "minShortTermFuelTrim2" : -1.0,
  52. "maxShortTermFuelTrim2" : 1.0,
  53. "averageO211" : 9698.5,
  54. "minO211" : 1191.0,
  55. "maxO211" : 27000.0,
  56. "averageO212" : 30349.0,
  57. "minO212" : 28299.0,
  58. "maxO212" : 32499.0,
  59. "averageShortTermFuelTrim1" : -0.25,
  60. "minShortTermFuelTrim1" : -2.0,
  61. "maxShortTermFuelTrim1" : 4.0,
  62. "averageMaf" : 2.4332170200000003,
  63. "minMaf" : 0.77513736,
  64. "maxMaf" : 7.0106280000000005,
  65. "averageTimingAdvance" : 28.0,
  66. "minTimingAdvance" : 16.5,
  67. "maxTimingAdvance" : 41.0,
  68. "averageClimb" : -0.025,
  69. "minClimb" : -0.2,
  70. "maxClimb" : 0.1,
  71. "averageFuelPressure" : null,
  72. "minFuelPressure" : null,
  73. "maxFuelPressure" : null,
  74. "averageTemp" : 199.0,
  75. "minTemp" : 199.0,
  76. "maxTemp" : 199.0,
  77. "averageAmbientAirTemp" : 77.75,
  78. "minAmbientAirTemp" : 77.0,
  79. "maxAmbientAirTemp" : 78.0,
  80. "averageManifoldPressure" : 415.4026475455047,
  81. "minManifoldPressure" : 248.2073910645339,
  82. "maxManifoldPressure" : 592.9398786541643,
  83. "averageLongTermFuelTrim1" : 3.25,
  84. "minLongTermFuelTrim1" : -1.0,
  85. "maxLongTermFuelTrim1" : 7.0,
  86. "averageLongTermFuelTrim2" : -23.5,
  87. "minLongTermFuelTrim2" : -100.0,
  88. "maxLongTermFuelTrim2" : 7.0,
  89. "averageGPSAcceleration" : 1.0196509034930195,
  90. "minGPSAcceleration" : 0.0,
  91. "maxGPSAcceleration" : 1.9128551867763974,
  92. "averageHeadingChange" : 0.006215710862578118,
  93. "minHeadingChange" : 0.0,
  94. "maxHeadingChange" : 0.013477895914941244,
  95. "averageIncline" : null,
  96. "minIncline" : null,
  97. "maxIncline" : null,
  98. "averageAcceleration" : 1.0196509034930195,
  99. "minAcceleration" : 0.0,
  100. "maxAcceleration" : 1.9128551867763974,
  101. "count" : 4.0,
  102. "eventTime" : ISODate("2017-07-18T18:11:30.000+0000"),
  103. "eventId" : {
  104. "year" : NumberInt(2017),
  105. "month" : NumberInt(7),
  106. "day" : NumberInt(18),
  107. "hour" : NumberInt(18),
  108. "minute" : NumberInt(11),
  109. "quarter" : NumberInt(2)
  110. },
  111. "accountId" : "17350",
  112. "vehicle" : "toyota-highlander-2005",
  113. "driver" : "anupam",
  114. "driverVehicle" : 12.0
  115. }

That's quite a number of values for analysis! Which is a good sign for us - more values gives us more options to play with it.

As you may have realized by now, I have come to the final stage of building the model which is a traditional machine-learning task that is usually done in Python or R. So the final piece will be written in Python. You will find the entire code at 'driver_signature_build_model_scikit.py' in the 'machinelearning' directory.

Feature selection and elimination

As is common in any data-science project, one must first take a look at the data and determine if any features need to be eliminated. If some features do not make sense for the model we are building then those features need to be dropped. One quick observation is that fuel pressure and incline has nothing to do with driver signatures. So I will eliminate those values from any further consideration.

Specifically for this problem, you need do something special, which is a bit unusual, but required in this scenario.

If you look at the features carefully you will notice that some features are driver characteristics while others are vehicle characteristics. Thus it is important to not mix up the two sets. I have used my judgement to separate out the features into two sets as follows.

  1. vehicle_features = [
  2. "averageLoad",
  3. "minLoad",
  4. "maxLoad",
  5. "averageRpm",
  6. "minRpm",
  7. "maxRpm",
  8. "averageEngineTime",
  9. "minEngineTime",
  10. "maxEngineTime",
  11. "averageAbsLoad",
  12. "minAbsLoad",
  13. "maxAbsLoad",
  14. "averageAccPedalPosE",
  15. "minAccPedalPosE",
  16. "maxAccPedalPosE",
  17. "averageAccPedalPosD",
  18. "minAccPedalPosD",
  19. "maxAccPedalPosD",
  20. "averageShortTermFuelTrim2",
  21. "minShortTermFuelTrim2",
  22. "maxShortTermFuelTrim2",
  23. "averageO211",
  24. "minO211",
  25. "maxO211",
  26. "averageO212",
  27. "minO212",
  28. "maxO212",
  29. "averageShortTermFuelTrim1",
  30. "minShortTermFuelTrim1",
  31. "maxShortTermFuelTrim1",
  32. "averageMaf",
  33. "minMaf",
  34. "maxMaf",
  35. "averageTimingAdvance",
  36. "minTimingAdvance",
  37. "maxTimingAdvance",
  38. "averageTemp",
  39. "minTemp",
  40. "maxTemp",
  41. "averageManifoldPressure",
  42. "minManifoldPressure",
  43. "maxManifoldPressure",
  44. "averageLongTermFuelTrim1",
  45. "minLongTermFuelTrim1",
  46. "maxLongTermFuelTrim1",
  47. "averageLongTermFuelTrim2",
  48. "minLongTermFuelTrim2",
  49. "maxLongTermFuelTrim2"
  50. ]
  51.  
  52. driver_features = [
  53. "averageGPSLatitude",
  54. "averageGPSLongitude",
  55. "averageThrottlePosB",
  56. "minThrottlePosB",
  57. "maxThrottlePosB",
  58. "averageThrottlePos",
  59. "minThrottlePos",
  60. "maxThrottlePos",
  61. "averageIntakeAirTemp",
  62. "minIntakeAirTemp",
  63. "maxIntakeAirTemp",
  64. "averageSpeed",
  65. "minSpeed",
  66. "maxSpeed",
  67. "averageAltitude",
  68. "minAltitude",
  69. "maxAltitude",
  70. "averageCommThrottleAc",
  71. "minCommThrottleAc",
  72. "maxCommThrottleAc",
  73. "averageGear",
  74. "minGear",
  75. "maxGear",
  76. "averageRelThrottlePos",
  77. "minRelThrottlePos",
  78. "maxRelThrottlePos",
  79. "averageGpsSpeed",
  80. "minGpsSpeed",
  81. "maxGpsSpeed",
  82. "averageClimb",
  83. "minClimb",
  84. "maxClimb",
  85. "averageAmbientAirTemp",
  86. "minAmbientAirTemp",
  87. "maxAmbientAirTemp",
  88. "averageGPSAcceleration",
  89. "minGPSAcceleration",
  90. "maxGPSAcceleration",
  91. "averageHeadingChange",
  92. "minHeadingChange",
  93. "maxHeadingChange",
  94. "averageAcceleration",
  95. "minAcceleration",
  96. "maxAcceleration"
  97. ]

Having done this, now we need to build two different models - one to predict the driver and another one to predict the vehicle. It will be an interesting exercise to see which of these two models have better accuracy.

Reading directly from database instead of CSV

For completeness sake let me first give you two utility functions that are used to pull data out of the MongoDB database.

  1. def _connect_mongo(host, port, username, password, db):
  2. """ A utility for making a connection to MongoDB """
  3. if username and password:
  4. mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
  5. conn = MongoClient(mongo_uri)
  6. else:
  7. conn = MongoClient(host, port)
  8. return conn[db]
  9.  
  10. def read_mongo(db, collection, query={}, projection='', limit=1000, host='localhost', port=27017, username=None, password=None, no_id=False):
  11. """ Read from Mongo and Store into DataFrame """
  12. db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
  13. cursor = db[collection].find(query, projection).limit(limit)
  14. datalist = list(cursor)
  15. sanitized = json.loads(json_util.dumps(datalist))
  16. normalized = json_normalize(sanitized)
  17. df = pd.DataFrame(normalized)
  18.  
  19. return df

The function above is Pandas-friendly - it reads data from the MongoDB database and returns a Pandas data-frame so that you can get to work immediately with your machine-learning part.

In case you are not comfortable with MongoDB, I am giving you the entire dataset of the aggregated values in CSV format so that you can import it in any database you wish. The file is in GZIP format - so you need to unzip it before reading it. For those of you who are comfortable with MongoDB, here is the entire database dump.

Building a Machine Learning model

Now it is time to build the learning model. At program invocation two parameters are needed - the database host and which feature set to build the model for. This is handled in the code as follows:

  1. DATABASE_HOST = argv[0]
  2. CHOSEN_FEATURE_SET = argv[1]
  3.  
  4. readFromDatabase = True
  5. read_and_proceed = False

Then I have some logic for setting the appropriate feature set within the application.

  1. if (CHOSEN_FEATURE_SET == 'vehicle'):
  2. features = vehicle_features
  3. feature_name = 'vehicle'
  4. class_variables = ['vehicle'] # Declare the vehicle as a class variable
  5. elif (CHOSEN_FEATURE_SET == 'driver'):
  6. features = driver_features
  7. feature_name = 'driver'
  8. class_variables = ['driver'] # Declare the driver as a class variable
  9. else:
  10. features = all_features
  11. feature_name = 'driverVehicleId'
  12. class_variables = ['driverVehicleId'] # Declare the driver-vehicle combo as a class variable
  13.  
  14. if readFromDatabase:
  15. if CHOSEN_FEATURE_SET == 'driver': # Choose the records only for one vehicle which has multiple drivers
  16. df = read_mongo('obd2', 'vehicle_signature_records', {"vehicle": {"\)regex" : ".*gmc-denali.*"}, "eventTime": {"\(gte": startTime, "\)lte": endTime} }, {"_id": 0}, 1000000, DATABASE_HOST, 27017, None, None, True )
  17. else:
  18. df = read_mongo('obd2', 'vehicle_signature_records', {"eventTime": {"\(gte": startTime, "\)lte": endTime} }, {"_id": 0}, 1000000, DATABASE_HOST, 27017, None, None, True )

The following part is mostly boiler-plate code to break up the dataset into a training set, test set and validation set. While doing so all null values are set to zero as well.

  1. # First randomize the entire dataset
  2. df = df.sample(frac=1).reset_index(drop=True)
  3.  
  4. # Then choose only a small subset of the data, frac=1 means choose everything
  5. df = df.sample(frac=1, replace=True)
  6.  
  7. df.fillna(value=0, inplace=True)
  8.  
  9. train_df, test_df, validate_df = np.split(df, [int(.8*len(df)), int(.9*len(df))])
  10.  
  11. df[feature_name] = df[feature_name].astype('category')
  12.  
  13. y_train = train_df[class_variables]
  14. X_train = train_df.reindex(columns=features)
  15. X_train.replace('NODATA', 0, regex=False, inplace=True)
  16. X_train.fillna(value=0, inplace=True)
  17.  
  18. y_test = test_df[class_variables]
  19. X_test = test_df.reindex(columns=features)
  20. X_test.replace('NODATA', 0, regex=False, inplace=True)
  21. X_test.fillna(value=0, inplace=True)
  22.  
  23. y_validate = validate_df[class_variables]
  24. X_validate = validate_df.reindex(columns=features)
  25. X_test.replace('NODATA', 0, regex=False, inplace=True)
  26. X_validate.fillna(value=0, inplace=True)

Building a Random Forest Classifier and saving it

After trying out various different classifiers, with this dataset, it turns out that a Random Forest classifier gives the best accuracy. Here is the graph showing accuracy of the different classifiers used with this data set. The two best algorithms turn out to be Classification & Regression and Random Forest Classifier. I chose the Random Forest Classifier since this is an ensamble techique and will have better resilience.

Raspberry AlgorithmComparison

This is what you need to do to build a Random Forest classifier with this dataset.

  1. dt = RandomForestClassifier(n_estimators=20, min_samples_leaf=1, max_depth=20, min_samples_split=2, random_state=0)
  2. dt.fit(X_train, y_train.values.ravel())
  3.  
  4. joblib.dump(dt, model_file)
  5. print('...done. Your Random Forest classifier has been saved in file: ' + model_file)

After building the model, I am saving it in a file (line 4) so that it can be read easily when doing the prediction. To find out how well the model is doing, we have to use the test set to make a prediction and evaluate the model score.

  1. y_pred = dt.predict(X_test)
  2. y_test_as_matrix = y_test.as_matrix()
  3. print('Completed generating predicted set')
  4.  
  5. print ('Confusion Matrix')
  6. print(confusion_matrix(y_test, y_pred))
  7.  
  8. crossValScore = cross_val_score(dt, X_validate, y_validate)
  9. model_score = dt.score(X_test, y_test_as_matrix)
  10. print('Cross validation score = ' + crossValScore)
  11. print('Model score = ' + model_score)
  12. print ('Precision, Recall and FScore')
  13. precision, recall, fscore, _ = prf(y_test, y_pred, pos_label=1, average='micro')
  14. print('Precision: ' + str(precision))
  15. print('Recall:' + str(recall))
  16. print('FScore:' + str(fscore))

 Many kinds of evalution metrics are calculated and printed in the above code segment. The most important one that I tend to look at is the overall model score, but the others will give you a good idea of the bias and variance which indicates how resilient your model is with respect to changing values.

Measure of importance

One interesting analysis is to figure out which of the features is the most impactful on the result. This can be done using the simple code fragment below:

  1. importance_indices = {}
  2. for z in range(0, len(dt.feature_importances_)):
  3. importance_indices[z] = dt.feature_importances_[z]
  4.  
  5. sorted_importance_indices = sorted(importance_indices.items(), key=operator.itemgetter(1), reverse=True)
  6.  
  7. for k1 in sorted_importance_indices:
  8. print(features[int(k1[0])] + ' -> ' + str(k1[1]))

 Prediction results and Conclusion

After running the two cases, namely driver prediction and vehicle prediciton, I am typically getting the following scores.

Driver Prediction Using Raspberry Pi results

This is encouraging given that there was always an apprehension about the score not being accurate enough due to the low frequency of data collection. This is an important factor, since we are creating this model out of the instantaneous time derivatives of values, and a low sampling rate will introduce a significant error. The dataset has 13 different driver vehicle combinations. There isn't a whole lot of driving data other than the experiments that were done, but with an accuracy that is 95% or above, there may be some value in this approach.

Another interesting fact is that the vehicle prediction is coming out to be more accurate than the driver. In other words, the parameters being emitted by the car tend to characterize the car more heavily than the driver. Most drivers drive the same way, but the machine characteristics of the car tend to distinguish them more clearly.

Commercial Use Cases

I have showed you an example of many such applications that can be done with an approach like this. It just involves equipping your car with a smart device like a Raspberry Pi and the rest is all backend server-side work. Here are all the use-cases that I can think of. You can take up any of these as your own project and attempt to find a solution.

  1. Parking assistance
  2. Adaptive collision detection
  3. Video evidence recording
  4. Detect abusive driving
  5. Crash detection
  6. Theft detection
  7. Parking meter
  8. Mobile hot-spot
  9. Voice recognition
  10. Connect racing equipment
  11. Head Unit display
  12. Traffic sign warning
  13. Pattern of usage
  14. Reset fault codes
  15. Driver recognition (this is already demonstrated here!)
  16. Emergency braking alert
  17. Animal overheating protection
  18. Remote start
  19. Remote seatbelt notifications
  20. Radio volume regulation
  21. Auto radio off when window down
  22. Eco-driving optimization alerts
  23. Auto lock/unlock

Commercial product

After doing this experiment building a Raspberry Pi kit from scratch, I found out that there is a product called AutoPi that you can buy which will cut short a lot of the hardware setup. I have no affiliation with AutoPi, but I thought it is interesting that this subject is being treated quite seriously by some companies in Europe.

 

 

This is the second article of the series to determine driver signatures from OBD data using a Raspberry Pi. In the first article I had described in detail how to construct your Raspberry Pi. Now let us write some code to read data from your car and put it to the test. In this second article I will describe the software needed to read data from your car's CAN bus, including some data captured from the GPS antenna attached to your Raspberry Pi, combine it into one packet and send it over to the cloud. I will show you the software setup for capturing data on the client (the Raspberry Pi), store it locally, compress that data on a periodic basis, encrypt it and send it to a cloud server. I will also show you the server setup you need on the cloud to receive the data coming in from the Raspberry Pi, decrypt it and store it in a database or push it to a messaging queue for streaming purposes. All work will be done in Python. My database of choice is MongoDB for this project.

Before you read this article, I would encourage you to read the first article of this series so that you know what hardware setup you need to reproduce this yourself.

Capturing OBD data locally

To begin with let us first see how we can capture data on the Raspberry Pi and save it locally. Since this is the first task that needs to be accomplished, let us figure out a way to capture data constantly and save it somewhere. Our data transmittal task is actually achieved using two processes.

  1. Capture data constantly and keep saving it to a local database.
  2. Periodically (once a minute in our case) summarize the data collected since the last successful run, and send it over to the cloud database.

Since we are going to execute a lot of code, I am only going to illustrate the salient features of the solution. A lot of the simpler programming nuances are left for you to figure out by looking at the code.

Did I say looking at the code? Where is it? Well, the entire code-base for this problem is in Github at https://github.com/anupambagchi/driver-signature-raspberry-pi  You can clone this repository on your machine and go through the details. Note that I was successful in running this code only on a Raspberry Pi running Ubuntu Mate. I had some trouble installing the required module gps on a Mac, but it runs fine on a Raspberry Pi where it is supposed to run. Most of the modules required by the Python program can be obtained using the 'pip' command, e.g 'pip install crypto'. To get the gps module you need to do 'sudo apt-get install python-gps'.

Where to store the data on a Raspberry Pi?

Remember that the Raspberry Pi is a small device with small memory and possibly small disk space. You need to choose a database that is nimble but effective for this scenario. We do not need any multi-threading ability, nor do we need to store months worth of data. The database is mostly going to be used to collect transitional data that will shortly be compacted and sent over to the cloud database.

The universal database for this purpose is the in-built SQLite database that comes with every Linux installation. It is a file-based database - which means one has to specify a file when instantiating this database. Make a clone of the repository at the '/opt' directory on your Raspberry Pi.

You will find a file called /opt/driver-signature-raspberry-pi/create_table_statements.sql and two other files with the extension '.db' which are your database files for running the job.

To initialize the database, you will need to run some initialization script. This is a one-time process on your Raspberry Pi. The SQL statements to set up the database tables are as follows:

  1. CREATE TABLE CAR_READINGS(
  2. ID INTEGER PRIMARY KEY NOT NULL,
  3. EVENTTIME TEXT NOT NULL,
  4. DEVICEDATA BLOB NOT NULL
  5. );
  6.  
  7. CREATE TABLE LAST_PROCESSED(
  8. TABLE_NAME TEXT NOT NULL,
  9. LAST_PROCESSED_ID INTEGER NOT NULL
  10. );
  11.  
  12. CREATE TABLE PROCESSED_READINGS(
  13. ID INTEGER PRIMARY KEY NOT NULL,
  14. EVENTTIME TEXT NOT NULL,
  15. TRANSMITTED BOOLEAN DEFAULT FALSE,
  16. DEVICEDATA BLOB NOT NULL,
  17. ENCKEY BLOB NOT NULL,
  18. DATASIZE INTEGER NOT NULL
  19. );
  20.  
  21.  

To run it, you need to invoke the following:

  1. $ sqlite3 obd2data.db < create_table_statements.sql

This will create the necessary tables into the database file 'obd2data.db '.

Capturing OBD data

Now let us focus on capturing the OBD data. For this we make use of a popular Python library called pyobd which may be found at https://github.com/peterh/pyobd. There have been many forks of this library over the past 8 years or so. However my repository adds a lot to it - mainly for cloud processing and machine learning - so I decided not to call it a fork since the original purpose of the library has been altered a lot. I also modified the code to work well with Python 3.

The main program to read data from the OBD port and save it to a SQLite3 database may be found in 'obd_sqlite_recorder.py'. You can refer to this file under 'src' folder while you read the following.

To invoke this program you have to pass two parameters - the name of the user and a string representing the vehicle. For the latter I generally use a convention '<make>-<model>-<year>' for example 'gmc-denali-2015'. Let us now go through the salient features of the OBD scanner.

After doing some basic sanity tests, such as whether the program is running as superuser or not, and whether the appropriate number of parameters have been passed or not, the next step is to search the ports for GSM modem and initialize it.

  1. allRFCommDevicePorts = scanRadioComm()
  2. allUSBDevicePorts = scanUSBSerial()
  3. print("RFPorts detected with devices on them: " + str(allRFCommDevicePorts))
  4. print("USBPorts detected with devices on them: " + str(allUSBDevicePorts))
  5.  
  6. usbPortsIdentified = {}
  7.  
  8. iccid = '' # Default values are blank for those that come from GSM modem
  9. imei = ''
  10.  
  11. for usbPort in allUSBDevicePorts:
  12. try:
  13. with time_limit(4):
  14. print ("Trying to connect as GSM to " + str(usbPort))
  15. gsm = GsmModem(port=usbPort, logger=GsmModem.debug_logger).boot()
  16. print ("GSM modem detected at " + str(usbPort))
  17. allUSBDevicePorts.remove(usbPort) # We just found it engaged, don't use it again
  18. iccid = gsm.query("AT^ICCID?", "^ICCID:").strip('"')
  19. imei = gsm.query("ATI", "IMEI:")
  20. usbPortsIdentified[str(usbPort)] = "gsm"
  21. print(usbPort, usbPortsIdentified[usbPort])
  22. break # We got a port, so break out of loop
  23. except TimeoutException:
  24. # Maybe this is not the right port for the GSM modem, so skip to the next number
  25. print ("Timed out!")
  26. except IOError:
  27. print ("IOError - so " + usbPort + " is also not a GSM device")

 Once this is done, we need to clean up anything that is 15 days or older so that the database does not grow any bigger. The expectation is that that data is too old and should have been transmitted to the cloud long ago, so we should clean it up to keep the database healthy.

  1. # Open a SQLlite3 connection
  2. dbconnection = sqlite3.connect('/opt/driver-signature-raspberry-pi/database/obd2data.db')
  3. dbcursor = dbconnection.cursor()
  4.  
  5. # Do some cleanup as soon as you start. This is to prevent the database size from growing too big.
  6. localtime = datetime.now()
  7. delta = timedelta(days=15)
  8. fifteendaysago = localtime - delta
  9. fifteendaysago_str = fifteendaysago.isoformat()
  10. dbcursor.execute('DELETE FROM CAR_READINGS WHERE EVENTTIME < ?', (fifteendaysago_str,))
  11. dbconnection.commit()
  12. dbcursor.execute('VACUUM CAR_READINGS')
  13. dbconnection.commit()

Notice that we are opening up the database connection and executing a SQL statement to clean up and purge the data that is older than 15 days.

Next it is time to connect to the OBD port. Check if the connection can be established, and if not exit the program. Before you run this program, you need to use your Bluetooth settings on the desktop to connect to the ELM 327 device that should be alive and available for connection as soon as you turn the ignition switch on. This connection may be done manually by using the Linux Desktop UI or through a program that automatically does the connection as soon as the machine comes alive.

  1. gps_poller.start() # start it up
  2. logitems_full = ["dtc_status", "dtc_ff", "fuel_status", "load", "temp", "short_term_fuel_trim_1",
  3. "long_term_fuel_trim_1", "short_term_fuel_trim_2", "long_term_fuel_trim_2",
  4. "fuel_pressure", "manifold_pressure", "rpm", "speed", "timing_advance", "intake_air_temp",
  5. "maf", "throttle_pos", "secondary_air_status", "o211", "o212", "obd_standard",
  6. "o2_sensor_position_b", "aux_input", "engine_time", "abs_load", "rel_throttle_pos",
  7. "ambient_air_temp", "abs_throttle_pos_b", "acc_pedal_pos_d", "acc_pedal_pos_e",
  8. "comm_throttle_ac", "rel_acc_pedal_pos", "eng_fuel_rate", "drv_demand_eng_torq",
  9. "act_eng_torq", "eng_ref_torq"]
  10.  
  11. # Initialize the OBD recorder
  12. obd_recorder = OBD_Recorder(logitems_full)
  13. need_to_exit = False
  14. try:
  15. obd_recorder.connect(allRFCommDevicePorts + allUSBDevicePorts)
  16. except:
  17. exc_type, exc_value, exc_traceback = sys.exc_info()
  18. traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
  19. print ("Unable to connect to OBD port. Exiting...")
  20. need_to_exit = True
  21.  
  22. if not obd_recorder.is_connected():
  23. print ("OBD device is not connected. Exiting.")
  24. need_to_exit = True
  25.  
  26. if need_to_exit:
  27. os._exit(-1)

Notice that we first start the GPS poller. Then attempt to connect to the OBD recorder, and exit the program if unsuccessful.

Now that all connections have been checked, it is time to do the actual job of recording the readings.

  1. # Everything looks good - so start recording
  2. print ("Database logging started...")
  3. print ("Ids of records inserted will be printed on screen.")
  4.  
  5. lastminute = -1
  6. need_to_exit = False
  7. while True:
  8. # It may take a second or two to get good data
  9. # print gpsd.fix.latitude,', ',gpsd.fix.longitude,' Time: ',gpsd.utc
  10. if need_to_exit:
  11. os._exit(-1)
  12.  
  13. if (obd_recorder.port is None):
  14. print("Your OBD port has not been set correctly, found None.")
  15. sys.exit(-1)
  16.  
  17. localtime = datetime.now()
  18. results = obd_recorder.get_obd_data()
  19.  
  20. currentminute = localtime.minute
  21. if currentminute != lastminute:
  22. dtc_codes = obd_recorder.get_dtc_codes()
  23. print ('DTC=', str(dtc_codes))
  24. results["dtc_code"] = dtc_codes
  25. lastminute = currentminute
  26.  
  27. results["username"] = username
  28. results["vehicle"] = vehicle
  29. results["eventtime"] = datetime.utcnow().isoformat()
  30. results["iccid"] = iccid
  31. results["imei"] = imei
  32.  
  33. loc = {}
  34. loc["type"] = "Point"
  35. loc["coordinates"] = [gpsd.fix.longitude, gpsd.fix.latitude]
  36. results["location"] = loc
  37. results["heading"] = gpsd.fix.track
  38. results["altitude"] = gpsd.fix.altitude
  39. results["climb"] = gpsd.fix.climb
  40. results["gps_speed"] = gpsd.fix.speed
  41. results["heading"] = gpsd.fix.track
  42.  
  43. results_str = json.dumps(results)
  44. # print(results_str)
  45.  
  46. # Insert a row of data
  47. dbcursor.execute('INSERT INTO CAR_READINGS(EVENTTIME, DEVICEDATA) VALUES (?,?)',
  48. (results["eventtime"], results_str))
  49.  
  50. # Save (commit) the changes
  51. dbconnection.commit()
  52.  
  53. post_id = dbcursor.lastrowid
  54. print(post_id)
  55.  
  56. except (KeyboardInterrupt, SystemExit, SyntaxError): # when you press ctrl+c
  57. print ("Manual intervention Killing Thread..." + sys.exc_info()[0])
  58. need_to_exit = True
  59. except serial.serialutil.SerialException:
  60. print("Serial connection error detected - OBD device may not be communicating.
  61. Exiting." + sys.exc_info()[0])
  62. need_to_exit = True
  63. except IOError:
  64. print("Input/Output error detected. Exiting." + sys.exc_info()[0])
  65. need_to_exit = True
  66. except:
  67. print("Unexpected exception encountered. Exiting." + sys.exc_info()[0])
  68. need_to_exit = True
  69. finally:
  70. exc_type, exc_value, exc_traceback = sys.exc_info()
  71. traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
  72. print(sys.exc_info()[1])
  73. gps_poller.running = False
  74. gps_poller.join() # wait for the thread to finish what it's doing
  75. dbconnection.close()
  76. print ("Done.\nExiting.")
  77. sys.exit(0)
  78.  

 A few lines of this code need explanation. The readings are stored in the variable 'results'. This is a dictionary that is first populated through a call to obd_recorder.get_obd_data() [Line 18]. This loops through all the required variables that we need to measure and goes through a loop to measure the values. This dictionary is then augmented with the DTC codes, if any codes are found [Line 22]. DTC stands for Diagnostic Troubleshooting Code and are codes set by the manufacturer to represent some error conditions inside the vehicle or engine. In lines 27-31, the results dictionary is augmented with the username, vehicle and mobile SIM card parameters. Finally in lines 34-41 we add the GPS readings.

So you see that each reading contains information from various sources - the CAN bus, SIM card, user-provided data and GPS signals.

When all data is gathered in the record, we save it in the database (Line 47-48) and commit the changes.

Uploading the data to the cloud

Note that all the data that has been saved so far has not left the machine - it is stored locally inside the machine. Now it is time to work on a mechanism to send it over to the cloud. This data must be

  1. summarized
  2. compressed
  3. encrypted

before we can upload it to our server. On the server side, that same record needs to be decrypted, uncompressed and then stored in a more persistent storage where one can do some BigData analysis. At the same time it needs to be streamed to a messaging queue to make it available for stream processing - mainly for alerting purposes.

Stability and Thread-safety

The driver for uploading data to the cloud is a cronjob that runs every minute. We could also write a program with an internal timer that runs like a daemon, but after a lot of experimentation - specially with large data-sets, I have realized that running an internal timer leads to instability over the long run. When a program runs for ever, it may build up some garbage in the heap over time and ultimately freezes. When a program is invoked through a cronjob, it wakes up, runs, does its job for that moment and exits. That way it always stays out of the way of the data collection program and keeps the machine healthy.

On the same lines, I also need to mention something about thread-safety pertaining to SQLite3. The new task that I am about to attempt is summarization of the data collected by the recorder. So I can technically use the same database that runs from this single file called obd2data.db - right? Not so fast. Because the recorder runs in an infinite loop and constantly writes data to this database, if you attempt to write another table to this same database, it runs into thread-safety issues and the table gets corrupted. I tried this initially, then realized that this was not a stable architecture when I saw it frozen or found data-corruption. So I had to alter it to write the summary to a different database - leaving the raw data database in read-only mode.

Data Compactor and Transmitter

To accomplish the task of transmitting the summarized data to the cloud, let us write a class that fulfils this task. You will find this is the file obd_transmitter.py.

The main loop that does the task is as follows:

  1. DataCompactor.collect()
  2.  
  3. # We do not want the server to be pounded with requests all at the same time
  4. # So we have a random wait time to distribute it over the next 30 seconds.
  5. # This brings the max wait time per minute to be 40 seconds, which is still 20 seconds to do the job (summarize + transmit).
  6. waitminutes = randint(0, 30)
  7. if with_wait_time:
  8. time.sleep(waitminutes)
  9. DataCompactor.transmit()
  10. DataCompactor.cleanup()

There are three tasks - collect, transmit and cleanup. Let us take a look at each of these individually.

Collect and summarize

Driver Prediction Using Raspberry Pi 2 Architecture

The following code will create packets of data for each minute, encrypt it, compress it and then transmit it. There are finer details in each of these steps that I am going to explain. But let's look at the code first.

  1. # First find out the id of the record that was included in the last compaction task
  2. dbconnection = sqlite3.connect('/opt/driver-signature-raspberry-pi/database/obd2data.db')
  3. dbcursor = dbconnection.cursor()
  4. last_id_found = dbcursor.execute('SELECT LAST_PROCESSED_ID FROM LAST_PROCESSED WHERE TABLE_NAME = "CAR_READINGS" LIMIT 1')
  5.  
  6. lastId = 0
  7. try:
  8. first_row = next(last_id_found)
  9. for row in chain((first_row,), last_id_found):
  10. pass # do something
  11. lastId = row[0]
  12. except StopIteration as e:
  13. pass # 0 results
  14.  
  15. # Collect data till the last minute last second, but not including the current minute
  16. nowTime = datetime.utcnow().isoformat() # Example: 2017-05-14T19:51:29.071710 in ISO 8601 extended format
  17. # nowTime = '2017-05-14T19:54:58.398073' # for testing
  18. timeTillLastMinuteStr = nowTime[:17] + "00.000000"
  19. # timeTillLastMinute = dateutil.parser.parse(timeTillLastMinuteStr) # ISO 8601 extended format
  20.  
  21. dbcursor.execute('SELECT * FROM CAR_READINGS WHERE ID > ? AND EVENTTIME <= ?', (lastId,timeTillLastMinuteStr))
  22.  
  23. allRecords = []
  24. finalId = lastId
  25. for row in dbcursor:
  26. record = row[2]
  27. allRecords.append(json.loads(record))
  28. finalId = row[0]
  29.  
  30. if lastId == 0:
  31. # print("Inserting")
  32. dbcursor.execute('INSERT INTO LAST_PROCESSED (TABLE_NAME, LAST_PROCESSED_ID) VALUES (?,?)', ("CAR_READINGS", finalId))
  33. else:
  34. # print("Updating")
  35. dbcursor.execute('UPDATE LAST_PROCESSED SET LAST_PROCESSED_ID = ? WHERE TABLE_NAME = "CAR_READINGS"', (finalId,))
  36.  
  37. #print allRecords
  38. dbconnection.commit() # Save (commit) the changes
  39. dbconnection.close() # And close it before exiting
  40. print("Collecting all records till %s comprising IDs from %d to %d ..." % (timeTillLastMinuteStr, lastId, finalId))
  41.  
  42. encryptionKeyHandle = open('encryption.key', 'r')
  43. encryptionKey = RSA.importKey(encryptionKeyHandle.read())
  44. encryptionKeyHandle.close()
  45.  
  46. # From here we need to break down the data into chunks of each minute and store one record for each minute
  47. minutePackets = {}
  48. for record in allRecords:
  49. eventTimeByMinute = record["eventtime"][:17] + "00.000000"
  50. if eventTimeByMinute in minutePackets:
  51. minutePackets[eventTimeByMinute].append(record)
  52. else:
  53. minutePackets[eventTimeByMinute] = [record]
  54.  
  55. # print (minutePackets)
  56. summarizationItems = ['load', 'rpm', 'timing_advance', 'speed', 'altitude', 'gear', 'intake_air_temp',
  57. 'gps_speed', 'short_term_fuel_trim_2', 'o212', 'short_term_fuel_trim_1', 'maf',
  58. 'throttle_pos', 'climb', 'temp', 'long_term_fuel_trim_1', 'heading', 'long_term_fuel_trim_2']
  59.  
  60. dbconnection = sqlite3.connect('/opt/driver-signature-raspberry-pi/database/obd2summarydata.db')
  61. dbcursor = dbconnection.cursor()
  62. for minuteStamp in minutePackets:
  63. minutePack = minutePackets[minuteStamp]
  64. packet = {}
  65. packet["timestamp"] = minuteStamp
  66. packet["data"] = minutePack
  67. packet["summary"] = DataCompactor.summarize(minutePack, summarizationItems)
  68.  
  69. packetStr = json.dumps(packet)
  70.  
  71. # Create an AES encryptor
  72. aesCipherForEncryption = AESCipher()
  73. symmetricKey = Random.get_random_bytes(32) # generate a random key
  74. aesCipherForEncryption.setKey(symmetricKey) # and set it within the encryptor
  75. encryptedPacketStr = aesCipherForEncryption.encrypt(packetStr)
  76.  
  77. # Compress the packet
  78. compressedPacket = base64.b64encode(zlib.compress(encryptedPacketStr)) # Can be transmitted
  79. dataSize = len(packetStr)
  80.  
  81. # Now do asymmetric encryption of the key using PKS1_OAEP
  82. pks1OAEPForEncryption = PKS1_OAEPCipher()
  83. pks1OAEPForEncryption.readEncryptionKey('encryption.key')
  84. symmetricKeyEncrypted = base64.b64encode(pks1OAEPForEncryption.encrypt(symmetricKey)) # Can be transmitted
  85.  
  86. dbcursor.execute('INSERT INTO PROCESSED_READINGS(EVENTTIME, DEVICEDATA, ENCKEY, DATASIZE) VALUES (?,?,?,?)',
  87. (minuteStamp, compressedPacket, symmetricKeyEncrypted, dataSize))
  88.  
  89. # Save this list to another table
  90. dbconnection.commit() # Save (commit) the changes
  91. dbconnection.close() # And close it before exiting

To do some book-keeping (lines 2 to 13), I am keeping the last-processed Id in a separate table. Every time I successfully process a bunch of records, I save the last-processed Id in this table to pick up from during the next run. Remember, this is program is being triggered from a cronjob that runs every minute. You will find the cron description in the file crontab.txt under scripts directory.

Then we collect all the new records (lines 15 to 40) from the CAR_READINGS table and collect it in an array allRecords where each item is a rich document extracted from the JSON payload. One important point to note is that we do not include the current minute - since it may be incomplete. In lines 42 to 56 we are attempting to find out how many minutes have elapsed since the last time it was summarized and then pick up only those whole minutes which remain to be summarized and sent over. In Line 60 we are opening up a connection to a new database (stored in a different file - obd2summarydata.db) to store the summary data.

Lines 62 to 86 does the task of actually creating the summarized packet. Each packet has three fields - the time stamp (only minute, no seconds), the packet of all data collected during the minute, and the summary data (i.e aggregates over the minute). First this packet is created using a summarize function that I will describe later. Then this packet is encrypted using a randomly generated encryption key (Line 73) using AES encryption. Since the data packet size is non-uniform, we encrypt the packet using a randomly-generated key and then send the key over to the server in encrypted form to decrypt the packet. The encrypted packet is compressed (Line 78) to prepare it for transmission. The last step is to encrypt the transmission key itself so that it can also be sent over to the server in the same payload. We use PKS1 OAEP Encryption for this using a public key (encryption.key) stored on the server. The eventtime (whole minute), compressed/encrypted packet, encrypted key and the datasize is saved as a record in the table PROCESSED_READINGS (Line 86).

Note that when the packet is created you have a choice to only send the summarized data or the entire raw records along with the summarized data. It is obvious that if you want to save bandwidth you would do most of the "edge-processing" work in the Raspberry Pi itself and only send the summary record each time. However, in this experiment I wanted to do some additional work on the cloud - which was more granular than the once-a-minute scenario. As shown in part 3 of this series of articles, I actually do the summarization once every 15 seconds for driver signature analysis. So I needed to send all the raw data as well as the summary in my packet - there by increasing the bandwidth requirements. However the compression of data helped a lot is reducing the size of the original packet by almost 90%.

Data Aggregation

Let me now describe how the summarization is done. This is the "edge-computing" part of the entire process that is difficult to do within generic devices. Any IoT device (CalAmp for example) will be able to do most of the work pertaining to capturing OBD data and transmiting it to the cloud. But those devices perhaps are not capable enough to do the summarization - which is why one needs a more powerful computing machine like a Raspberry Pi to do the job. All I do for summarization is the following:

  1. summary = {}
  2. for item in items:
  3. summaryItem = {}
  4. itemarray = []
  5. for reading in readings:
  6. if isinstance(reading[item], (float, int)):
  7. itemarray.append(reading[item])
  8. # print(itemarray)
  9. summaryItem["count"] = len(itemarray)
  10. if len(itemarray) > 0:
  11. summaryItem["mean"] = numpy.mean(itemarray)
  12. summaryItem["median"] = numpy.median(itemarray)
  13. summaryItem["mode"] = stats.mode(itemarray)[0][0]
  14. summaryItem["stdev"] = numpy.std(itemarray)
  15. summaryItem["variance"] = numpy.var(itemarray)
  16. summaryItem["max"] = numpy.max(itemarray)
  17. summaryItem["min"] = numpy.min(itemarray)
  18.  
  19. summary[item] = summaryItem
  20.  
  21. return summary

Look at line 56 of the previous block of code. You will see an array of items describing all the items that we need to summarize. This is in the variable summarizationItems. For each item in this list, we need to find the mean, median, mode, standard deviation, variance, maximum and minimum during each minute (Lines 11 to 17). The summarized items are appended to each record before it is saved to the summary database.

Transmitting the data to the cloud

To transmit the data over to the cloud you need to first set up an end-point. I am going to show you later how you can do that on the server. For now, let us assume that you already have that available. Then from the client side you can do the following to transmit the data:

  1. base_url = "http://OBD-EDGE-DATA-CATCHER-43340034802.us-west-2.elb.amazonaws.com" # for accessing it from outside the firewall
  2.  
  3. url = base_url + "/obd2/api/v1/17350/upload"
  4.  
  5. dbconnection = sqlite3.connect('/opt/driver-signature-raspberry-pi/database/obd2summarydata.db')
  6. dbcursor = dbconnection.cursor()
  7. dbupdatecursor = dbconnection.cursor()
  8.  
  9. dbcursor.execute('SELECT ID, EVENTTIME, TRANSMITTED, DEVICEDATA, ENCKEY, DATASIZE FROM PROCESSED_READINGS WHERE TRANSMITTED="FALSE" ORDER BY EVENTTIME')
  10. for row in dbcursor:
  11. rowid = row[0]
  12. eventtime = row[1]
  13. devicedata = row[3]
  14. enckey = row[4]
  15. datasize = row[5]
  16.  
  17. payload = {'size': str(datasize), 'key': enckey, 'data': devicedata, 'eventtime': eventtime}
  18. response = requests.post(url, json=payload)
  19.  
  20. #print(response.text) # TEXT/HTML
  21. #print(response.status_code, response.reason) # HTTP
  22.  
  23. if response.status_code == 201:
  24. dbupdatecursor.execute('UPDATE PROCESSED_READINGS SET TRANSMITTED="TRUE" WHERE ID = ?', (rowid,))
  25. dbconnection.commit() # Save (commit) the changes
  26.  
  27. dbconnection.commit() # Save (commit) the changes
  28. dbconnection.close() # And close it before exiting

The end-point (that I am going to show you later) will accept POST requests. But you also need to configure a load-balancer that just allows a connection from the outside world to inside the firewall. You must establish adequate security measures to ensure that your tunnel only exposes a certain port on the internal server.

Lines 1 to 7 set up the database connections to the summary database. In the table I am storing a flag "TRANSMITTED" that indicates if the record has been transmitted or not. For all records that have not been transmitted (Line 9) I am creating a payload comprising of size of packet, the encrypted key to use for decrypting the packet, the compressed/encrypted data packet and the eventtime (Line 17). Then this payload is POSTed to the end-point (Line 18). If the transmission is successful, the flag TRANSMITTED is set to true for this packet so that we do not attempt to send this again.

Cleanup

The cleanup operation is pretty simple. All I do is delete all records from the summary table that are more than 15 days old.

  1. localtime = datetime.now()
  2. if int(localtime.isoformat()[14:16]) == 0:
  3. delta = timedelta(days=15)
  4. fifteendaysago = localtime - delta
  5. fifteendaysago_str = fifteendaysago.isoformat()
  6. dbconnection = sqlite3.connect('/opt/driver-signature-raspberry-pi/database/obd2summarydata.db')
  7. dbcursor = dbconnection.cursor()
  8. dbcursor.execute('DELETE FROM PROCESSED_READINGS WHERE EVENTTIME < ?', (fifteendaysago_str,))
  9. dbconnection.commit()
  10. dbcursor.execute('VACUUM PROCESSED_READINGS')
  11.  
  12. dbconnection.commit() # Save (commit) the changes
  13. dbconnection.close() # And close it before exiting

 

Server on the Cloud

As a final piece to this article let me describe how to set up the end-point of the server. There are many items that are needed to put it together. Surprisingly all of this is achieved in a relatively small amount of code - thanks to the crispness of the Python language.

  1. print(request.content_type)
  2. if not request.json or not 'size' in request.json:
  3. raise InvalidUsage('Invalid usage of this web-service detected', status_code=400)
  4.  
  5. size = int(request.json['size'])
  6. decoded_compressed_record = request.json.get('data', "")
  7. symmetricKeyEncrypted = request.json.get('key', "")
  8.  
  9. compressed_record = base64.b64decode(decoded_compressed_record)
  10. encrypted_json_record_str = zlib.decompress(compressed_record)
  11.  
  12. pks1OAEPForDecryption = PKS1_OAEPCipher()
  13. pks1OAEPForDecryption.readDecryptionKey('decryption.key')
  14. symmetricKeyDecrypted = pks1OAEPForDecryption.decrypt(base64.b64decode(symmetricKeyEncrypted))
  15.  
  16. aesCipherForDecryption = AESCipher()
  17. aesCipherForDecryption.setKey(symmetricKeyDecrypted)
  18.  
  19. json_record_str = aesCipherForDecryption.decrypt(encrypted_json_record_str)
  20.  
  21. record_as_dict = json.loads(json_record_str)
  22.  
  23. # Add the account ID to the reading here
  24. record_as_dict["account"] = account
  25.  
  26. #print record_as_dict
  27. post_id = mongo_collection.insert_one(record_as_dict).inserted_id
  28. print('Saved as Id: %s' % post_id)
  29.  
  30. producer = KafkaProducer(bootstrap_servers=['your.kafka.server.com:9092'],
  31. value_serializer=lambda m: json.dumps(m).encode('ascii'),
  32. retries=5)
  33. # send the individual records to the Kafka queue for stream processing
  34. raw_readings = record_as_dict["data"]
  35. counter = 0
  36. for raw_reading in raw_readings:
  37. raw_reading["id"] = str(post_id) + str(counter)
  38. raw_reading["account"] = account
  39. producer.send("car_readings", raw_reading)
  40. counter += 1
  41.  
  42. producer.flush()
  43. # send the summary to the Kafka queue in case there is some stream processing required for that as well
  44. raw_summary = record_as_dict["summary"]
  45. raw_summary["id"] = str(post_id)
  46. raw_summary["account"] = account
  47. raw_summary["eventTime"] = record_as_dict["timestamp"]
  48. producer.send("car_summaries", raw_summary)
  49.  
  50. producer.flush()
  51. return jsonify({'title': str(size) + ' bytes received'}), 201

I decided to use MongoDB as persistent storage for records and Kafka as the messaging server for streaming. The following tasks are done in order in this function:

  1. Check for invalid usage of this web-service, and raise an exception if illegal (Line 1 to 3). A simple test is done to check for the existence of 'size' in the payload to ensure this.
  2. Decompress the packet (Line 9 to 10)
  3. Decrypt the transmission key using the private key (decryption.key) stored on the server. (Line 12 to 14)
  4. Decrypt the data packet (Line 19)
  5. Convert the JSON record to an internal Python dictionary for digging deeper into it (Line 21)
  6. Save the record in MongoDB (Line 27)
  7. Push the same record into a Kafka messaging queue (Lines 27 to 50)

This functionality is exposed as web-service using a Flask server. You will find the rest of the server code in file flaskserver.py in folder 'server'.

I have covered the salient features to put this together, skipping the other obvious pieces which you can peruse yourself by cloning the entire repository.

Conclusion

I know this has been a long post, but I needed to cover a lot of things. And we have not even started working on the data-science part. You may have heard that a data scientist spends 90% of the time in preparing data. Well, this task is even bigger - we had to set up the hardware and software to generate raw real-time data and store it in real-time to even start thinking about data science. If you are curious to see a sample of the collected data, you can find it here.

But now that this work is done, and we have taken special care that the generated data is in a nicely formatted form, the rest of the task should be easier. You will find the data science related stuff in the third and final episode of this series.

Go to Part 3 of this series

The Raspberry Pi is an extremely interesting invention. It is a full-fledged Linux box (literally can be caged inside a plastic box) and it basically allows you to run any program to connect to any other communicating device around it through cables or a Bluetooth adapter. I am going to show you how to build your own system to hook up a Raspberry Pi to your car, then extract diagnostic information from the CAN bus, upload that to the cloud, and then use a streaming API to predict who is driving the vehicle using a learning model created through a periodic backend process. To understand this, you need some basic electrical engineering skills and some exposure to Python programming.

Since all this work is pretty long, I intend to break it up into three parts which I am going to put into three different articles as follows:

  1. Driver Signatures from OBD Data captured using a Raspberry Pi: Part 1 (Building your Raspberry Pi setup)
  2. Driver Signatures from OBD Data captured using a Raspberry Pi: Part 2 (Reading real-time data and uploading to the cloud)
  3. Driver Signatures from OBD Data captured using a Raspberry Pi: Part 3 (Analyzing driver data and making predictions)

You should go through these articles in order. You are currently reading Part 1 of this series.

Building your Raspberry Pi setup

Note that there are several web-sites where people have described how to set up a Raspberry Pi for reading Onboard Diagnostic (OBD) data. So my article here is mostly a repetition. I created my setup by reading those articles and watching YouTube videos. You should do that too. The only difference between my article and their's is that I provide a motive for doing all this work and take it further - i.e. for creating driver signatures. The articles I have seen on other blog sites take you through the process of building a Raspberry Pi setup, but their story ends right there. In my case, that is just chapter one - and the more interesting work of uploading to the cloud and analyzing that data follows after that. For completeness sake, I thus have to describe a few things about the setup that pertains to hardware. Without that you cannot even start the project. So just follow along. If you the handyman type of guy (or gal), roll up your sleeves and build it. It is fun!

To cut the description short, here are a few articles describing the setup with enough pictures to get you started.

(a) https://www.cowfishstudios.com/blog/obd-pi-raspberry-pi-displaying-car-diagnostics-obd-ii-data-on-an-aftermarket-head-unit

(b) https://mtantawy.com/everything-you-need-to-know-to-integrate-your-raspberry-pi-into-your-car/#turn-your-raspberry-pi-into-car-diagnostic-tool

I started off by going through the first article given here. My car is slightly old, so I had no way of using the car's monitor for seeing the dashboard. So I decided to hook up an independent monitor with my Raspberry Pi that I can carry along with me as a "kit". Moreover, I needed to try out this setup on different cars and different drivers to build any meaningful model - so it was important for me to make my setup independent of any attachment to a vehicle. What I wanted was to hop on any car, start my system using the car battery, hook up my Raspberry Pi to the OBD's ELM 327 adapter using Bluetooth and run my program. So my setup is a bit different from the other guy's since I also needed to upload all that data on the cloud while the car was in motion. Remember, I said I want to do real-time prediction of the driver, so all data that is being generated has to go to the cloud in real-time (or near real-time) where we will apply a prediction step on a pre-built model to make the prediction.

Bill of Materials (Main Items)

Given below are the accessories you need to purchase for a complete setup. I am giving a picture of the items as well so that you can match it as closely as you can when you purchase them.

Raspberry Pi 3

The first thing you want is a Raspberry Pi 3 computer. Here is how it looks. Search for one on web and purchase it from your country's online retailer.

Raspberry Pi 3

You will also need to buy a plastic case for this board to protect the components.

Notice that the Pi shown above has an HDMI port for the monitor, an ethernet port, 4 USB ports and an SD card slot.

Monitor

You will need a 7-inch car monitor. Most people hook up the Raspberry Pi to the inbuilt monitor of the car, but my use-case is multi-car. So I wanted to keep my setup independent of any car - thus I went with a separate monitor. Here is what I used for this purpose.

This does not come with an HDMI cable, so I had to purchase a separate cable myself to connect to the Pi. Note that this came with a 110V adapter, so you may choose to either buy a cable to use with your car's cigarette lighter slot, or buy an inverter that converts 12V DC to 110V AC and use that as your universal power source for all situations.

Inverter (Optional)

I decided to go with the DC to AC inverter since that also [1] acts as a spike buster in the car - (note that voltage spikes when you start the car), and [2] you can have just one setup for your power source needs, whether you are inside the car, or sitting in the lab for your post-processing or development needs.

Here is a picture of the inverter I am using:

 

Keyboard

The next accessory you need is a keyboard to operate your machine. I went with an integrated keyboard/trackpad that goes along with size of the Raspberry Pi. Here is what I liked:

ELM 327 Adapter

To connect to your car, your Raspberry Pi needs a Bluetooth adapter. The ELM 327 standard allows a serial connection your car's CAN bus. This adapter can access your car's CAN bus data via a serial connection. Note that this is a slow connection (being serial), but we will try to manage with this. The connector is pretty cheap (around USD 10 or so). I have seen more advanced connectors from Kvaser that come in the range of $300 or so, and can read data at a much faster rate. Our connector will be able to read data at a reasonable rate, but not fast enough for highly accurate readings. We will live with it.

Here is how it looks.

You need to insert this into your car's OBD2 port which is generally found under the steering wheel of your car. It is generally above your left foot, and the port looks like this.

GSM modem

Since we intend to upload our data to the cloud in real-time, we will need a GSM modem. There are many choices and the software setup is different for different vendors. I went with a TELUS modem that uses a Huawei chip internally.

You will notice that this is where our setup goes beyond the other setups found on the internet.

GPS Antenna

Another important component we need is the GPS antenna. This is not really necessary for the driver signature part, but it is very important for visualization of data and to do other kinds of analysis - like speed and acceleration calculations from GPS data. Here is one that goes with the Raspberry Pi.

You need to follow the manual for connecting this to your Raspberry Pi. I had to do some soldering work in order to make the right connections. The software to drive this also needs to be installed via a 'apt-get' command.

Complete Kit

My complete kit after putting together all the components looks like this:

You can see that I decided to mount all of this on a two-layered particle board separated with plastic pipes. Even though it looks big, it is not clunky. There is enough room for expansion. I kept it in two layers so that all the power cables are hidden underneath the top board and exist out of view. You will need some velcro strips to keep the power supply units from falling away. I also used a USB extender (Inateck) to keep my USB devices from clogging up room around the Raspberry Pi.

This setup is clean and portable. You can use it to work and debug code inside your car or in the lab.

Conclusion

My setup was inspired by many articles on the web and on YouTube. You should also consult other links that show you how to do it. However this is not the main focus of this series - my main objective is to use this setup for some data science purpose. I chose to use this for driver signature analysis. The following two articles will delve deeper into the software setup - both on the client side and the server side, to solve this problem. You will find the links to the next two articles under. Happy reading!

Go to Part 2 of this series

 

Let us try to apply the principles of churn for a use-case involving sensors and devices. In this post we will apply some machine learning principles to predict which devices are likely to fail in future. The scenario is as follows:

Problem Description

A company is in the business of allocating car parking space to visitors. For that, it needs to have sensors installed at each parking space. These sensors are installed on the ground right under where the car is supposed to park. When a car comes over it, it senses the presence of the car through a set of five proximity sensors that then give a reading in analog voltages. These set of five readings would indicate if there is a car present above it. This data is then transmitted periodically over the network to the server where it is recorded. On the other hand, one can also send control messages over the cellular network to the sensors from the servers to take some action, like calibrate itself or reboot the device. Sensor under the car Sensor under the car senses an object above it by sending ultra-sonic waves[/caption] These sensors need to be really hardy. They can be driven upon by cars. They have to weather the sun's direct sunlight when there is no vehicle over it. They also have to weather the rain and the battery must last for a long time. Over time, though, the battery becomes weak, or the sensors themselves lose their ability to sense correctly, and then finally die. The engineers who are operating these sensors have noticed that every time they send a signal to reboot the device, some of the devices do not show up any more. This happens because during a power-cycle reboot, the sensor goes through some stress and, depending on network connection in that area, it takes up to 24 hours for the device to reconnect to the network and send its data. Thus it is known that sensors generally die after a power-cycle reboot. If data has not been received from the sensor for 24 hours, that device is most likely dead. We have a record of all sensors that were operational before and after the last reboot operation. A few of the devices stopped transmitting after that reboot event. The problem is to predict which devices are likely to die during the next reboot event.

Analyzing the data set

Before we attempt to solve this problem, let us take a look at some sample data. Note that in real-life situations the data is going to be noisy, messy and sometimes inaccurate. As a data scientist, one has to make a judicious choice as to whether it should be used or not. This elimination process must be applied to columns as well as rows.

We will analyze the columns one-by-one and make a decision to keep it or discard it.

  1. The first column appears to just a row Id. Thus it has no contribution to the data-model; so we discard it.
  2. The second column (mac) is the device id. This is the unique Id of the device, so we need to use it for predicting which devices are likely to die during the next power-cycle reboot.
  3. The third column (mj) does not change, so it has no effect on the data-model; thus it is discarded.
  4. The fourth column (fw) is same for all rows - a candidate for discarding it.
  5. The fifth column (time) is important. It tells us when the device data was received - thus we can actually see which devices were alive and dead after the last power cycle reboot.
  6. The sixth column (uamps) has no value since it is same for all rows; discard it.
  7. The seventh column (batt_v) appears to be the most important field of all. It is the battery voltage which has the highest impact on the life of a device stranded in the open space.
  8. The eighth field (cc) is a numeric measured value from the environment. We will keep it and observe if it has any impact on the life of the device.
  9. The ninth field (temp) is the temperature of the device at the time the reading was taken. We'll keep it.
  10. The tenth field (diag) may be discarded since it is zero all across.
  11. The eleventh field (mahrs_consumed) may also be discarded.
  12. The twelfth field (avg_lifetime_uamps) is not coming through correctly, so we discard it.
  13. The thirteenth field (missed_payloads) seems like an interesting field since it would have indicated the signal strength around that area, but unfortunately it is not coming through; so we discard it.
  14. The fourteenth field (total_time_sec) is zero all across; so discard it.
  15. The fifteenth field (l0) does have some significance since it is a sensor parameter. We'll keep it.
  16. The sixteenth field (l1) is also an sensor parameter with changing values. We'll keep that as well.
  17. The seventeenth field (prod_date) is a the date the device was manufactured. We probably don't have much use of this at the moment, so it will be ignored.
  18. The eighteenth field (vreg) appears to be important since it has values that change; so we'll keep it.
  19. The nineteenth field (sleep) has a constant value of zero; discard it.
  20. The twentieth field (eco) may be kept even though it seems to have little variation.
  21. The twenty-first field (esr_samples) is interesting since it has four different numeric values embedded inside it. We believe these values can be separated (using ':' as a delimiter) and these can be used individually. Most likely these are the raw voltage readings from the sensors which will have a huge impact on the life of the product. We'll use all four values found in this field.
  22. The last field (esr_timing) is the setting on the device when the readings in column 21 were taken. This is constant all through, so we discard it for our analysis.

The engineering team has also notified us that the last reboot operation happened on September 29th, 2016.

Problem Approach

Before we start our analysis you need to get the data-set. Get the GZIP file here. Download the file given in the link above and save it to some local folder on your computer. Open up a terminal and change directory to the folder where you downloaded the file. Then unzip the file using:

gunzip sensor_data_001.csv.gz

You will see the data in the file sensor_data_001.csv. You can readily observe that this is not a standard supervised learning problem. For any supervised learning problem we need an X-vector and a y-vector for the training set. We do not know yet how to get the output y-variable since it is not given in the problem directly. Thus for training purposes, the output variable must be derived in some way.

Determining the output y-vector

Look at the data-set carefully. There are 69,765 rows in this data-set. To determine which devices have died during the last reboot operation, one has to first sort this list by date. Fortunately, I have already sorted this data for you by date, so you should be able to see the records from the beginning in order. You will notice that this data-set contains records only for a few days. We need to partition this data-set into two parts.

  1. All records prior to Sep 29th, since that was the day when the last reboot operation happened. This will be our training set which helps us build out prediction model.
  2. All records after Sep 29th, to determine which devices survived the reboot operation. This will be our dev set on which we will apply our model to determine the "weak" devices that are likely to die after the next reboot operation.

Having done that, we may be able to determine which devices have died due to the reboot, and thus be able to re-create an additional column for the y-vector.

Problem Solution

Let us get started with the programming exercise. To solve this problem, we will use Python with Pandas, Scikit-Learn. Our goal is to write the entire solution in one file without multiple passes. Thus it will involve use of Pandas filtering - the results of which will be in memory.

Before you attempt it on your own machine, you need to ensure that you have the necessary libraries to run this program. Open up a terminal shell and run the following:

pip install scikit-learn
pip install pandas

After installing these necessary packages, we are ready to begin. I will describe the different stages of the task as different steps.

Step 1: Set up import statements

Let us first import a few necessary libraries:

  1. from __future__ import division
  2. import pandas as pd
  3. import numpy as np
  4. import datetime
  5. from sklearn.ensemble import RandomForestClassifier
  6. from sklearn.metrics import confusion_matrix
  7. from sklearn.metrics import f1_score
  8. from sklearn.metrics import precision_recall_fscore_support as prf
  9. import warnings
  10. warnings.filterwarnings("ignore")

Step 2: Read the CSV file and identify the column names.

We now need to open the data file and read the column names. Pandas has handy functions to do that.

  1. print("Reading CSV data from current directory...")
  2. all_sensor_data_df = pd.read_csv('sensor_data_001.csv')
  3. col_names = all_sensor_data_df.columns.tolist()

Step 3: Build the data-frames

The next step is to build the data frames in Pandas. While doing that we need to do certain transformations since some fields that contain numbers or dates are appearing in the data file as strings.

  1. First we need to convert the string variables found in column time_stamp into timestamps.
  2. Secondly, we need to pluck out the individual values from the field esr_samples into four different values. These are the individual sensor voltage readings that we believe have significant impact on the life of the sensor.
  1. print("Building dataframes...")
  2. all_sensor_data_df['time_stamp'] = pd.to_datetime(all_sensor_data_df['time'])
  3. X1 = all_sensor_data_df['esr_samples'].str.split(pat=':', expand=True)
  4. X1 = X1.rename(columns={0: 'esr0', 1: 'esr1', 2: 'esr2', 3: 'esr3'})
  5. all_sensor_data_df = pd.concat([all_sensor_data_df, X1], axis=1)

Notice that we are creating four new columns in the data frame with names esr0, esr1, esr2 and esr3. Then we are concatenating the newly generated data-frame with the original data-frame making it wider.

Step 4: Choosing the training set

Recall that I said, we need to look at the rows prior to the last reboot operation on Sep 29 to determine which devices were alive then and compare them with those found later. This helps us figure out which devices have died during the reboot.

  1. cutoff = datetime.datetime(2016, 9, 29)
  2. still_running_sensors_df = all_sensor_data_df.loc[all_sensor_data_df['time_stamp'] >= cutoff]
  3.  
  4. all_sensor_macs = all_sensor_data_df.mac.unique()
  5. alive_sensor_macs = still_running_sensors_df.mac.unique()

I created a Pandas filter to separate out the the rows prior to the cut-off date. Then I used the Pandas unique() function to figure out all the device Ids that existed during that time. When I use the same unique() function on the rows after the cut-off date, I get all sensors that are alive later.

  1. running_before_reboot_df = all_sensor_data_df.loc[all_sensor_data_df['time_stamp'] < cutoff]
  2. running_before_reboot_df.loc[:, 'isalive'] = np.where(running_before_reboot_df['mac'].isin(alive_sensor_macs), 1, 0)  # alive = 1, dead = 0

My goal is to create another Y-column (isalive) which has values 0 or 1 - where 0 represents dead and 1 represents alive. I do so by using the Pandas np.where() function and augmenting that information in the data-frame containing rows prior to Sep 29. This will be my training set.

For your reference here is the full set of columns in the original dataset:

  1. # all_columns = ['Unnamed: 0', 'mac', 'mj', 'fw', 'time', 'uamps', 'batt_v', 'cc', 'temp', 'diag', 'mahrs_consumed', 'avg_lifetime_uamps',
  2. 'missed_payloads', 'total_time_sec', 'l0', 'l1', 'prod_date', 'vreg', 'sleep', 'eco', 'esr_samples', 'esr_timing']

Step 5: Setting up the training task

Now it is time to set up the training task. I first create a list of columns to be used for the output vector, and also another set for all the input variables - or features.

  1. class_variables = ['isalive']
  2. features = ['eco', 'batt_v', 'missed_payloads', 'esr0', 'esr1', 'esr2', 'esr3']

A lot of what follows next is boiler-plate code that you can use for any generic data-science problem. I have been using this boiler-plate as my starting point for any coding problem that I want to use with Pandas and Sckit-Learn.

Note that we have all our training data in the data-frame named running_before_reboot_df. Typically, we would take a small sample out of this data-frame to verify the accuracy of the prediction. In this situation we can use the entire data-frame as our training set since we have already separated out the validation set (those records after Sep 29). But for demonstration purposes, I am going to still create a test set from the data-frame to create the confusion matrix and an F-score.

  1. train_df, test_df = np.split(running_before_reboot_df, [int(.8 * len(running_before_reboot_df))])

I am creating above two data-frames, one for training and another for testing. 80% of the data is being used for training.

  1. y_train = train_df[class_variables]
  2. X_train = train_df[features]
  3. X_train.fillna(value=0, inplace=True)
  4.  
  5. y_test = test_df[class_variables]
  6. X_test = test_df[features]
  7. X_test.fillna(value=0, inplace=True)

To avoid any problems with missing values, I am also filling with zeros all fields that do not have a value.

Step 6: Setting up the validation set for prediction

Similarly, one can set up the prediction set by taking records that were after the cut-off date.

  1. X_prediction_set = still_running_sensors_df[features]
  2. X_prediction_set.fillna(value=0, inplace=True)

Step 7: Training the data-model

Now it is time to train the model with the data-frames created so far. We choose to use the Random Forest classifier for this purpose. Through experience I have discovered that ensemble learning provides the best results since it uses multiple approaches to solve the problem and then chooses the few best among those.

  1. print('Building RandomForest Classifier ...')
  2. model = RandomForestClassifier(n_estimators=20, min_samples_leaf=1, max_depth=20, min_samples_split=2, random_state=0)
  3. model.fit(X_train, y_train.values.ravel())
  4. print('... built')

Step 8: Validating the model and finding accuracy

For data scientists most of the time is spent in validating and tuning the model. I am giving below the necessary code to print out the Confusion Matrix along with the Precision, Recall and F-score.

  1. y_pred = model.predict(X_test)
  2. y_test_as_matrix = y_test.as_matrix()
  3.  
  4. print('Confusion Matrix')
  5. print(confusion_matrix(y_test, y_pred))
  6.  
  7. model_score = model.score(X_test, y_test_as_matrix)
  8.  
  9. print('Features: ' + str(features))
  10. print('Feature importances: ', model.feature_importances_)
  11. print('Model Score: %f' % model_score)
  12.  
  13. print("F1 Score with macro averaging:" + str(f1_score(y_test, y_pred, average='macro')))
  14. print("F1 Score with micro averaging:" + str(f1_score(y_test, y_pred, average='micro')))
  15. print("F1 Score with weighted averaging:" + str(f1_score(y_test, y_pred, average='weighted')))
  16.  
  17. print('Precision, Recall and FScore')
  18. precision, recall, fscore, _ = prf(y_test, y_pred, pos_label=1, average='micro')
  19. print('Precision: ' + str(precision))
  20. print('Recall:' + str(recall))
  21. print('FScore:' + str(fscore))

If you run this, you will see that the model score come out as 82% accurate. This is not perfect, but it is perhaps the best what our data-set allows. Remember that we are dealing with noisy data, and trying to come up with a prediction based on that data-set without doing any further filtering.

One of the interesting results coming from this analysis is finding out which parameters have the most impact on a sensor's death. For that, one has to look at the feature importance numbers. Looking at the numbers, one can see that esr3 has the most impact, followed by battery voltage and then esr2 and esr1.

Step 9: Making a prediction based on this data-model

Now that we have created a data-model, let us try to predict how many devices are likely to die during the next reboot.

  1. # We need to find those that are about to die, i.e. 0 value. Filter only those values that are 0
  2. X_prediction_set.loc[:, 'predicted_life'] = model.predict(X_prediction_set)
  3. final_prediction_df = pd.concat([still_running_sensors_df[['mac']], X_prediction_set], axis=1)
  4.  
  5. may_possibly_die_df = final_prediction_df.loc[final_prediction_df['predicted_life'] == 0]
  6. sensors_that_may_die = may_possibly_die_df.mac.unique()

All I am doing here is using the predict() method provided by Scikit-learn to apply the model on the prediction data-frame. Out of the values that I get as my predicted value of isalive, I am only choosing those that have a value of zero i.e. likely to die.

Step 10: All done, print out the device Ids

Finally it is time to print out the device Ids that were identified to be weak and likely to switch off permanently during the next reboot.

  1. print('All sensors in dataset:')
  2. print(all_sensor_macs)
  3.  
  4. print('Sensors that were still alive after 9/28 reboot event:')
  5. print(alive_sensor_macs)
  6.  
  7. print('Sensors that may possibly die in future:')
  8. print(sensors_that_may_die)

Full output of the program

Given below is the full output of the program. You can see that it provides the list of device ids that are likely to die during the next reboot.

Reading CSV data from current directory...
Building dataframes...
Building RandomForest Classifier ...
... built
Confusion Matrix
[[2218  164]
 [1956 7501]]
Features: ['eco', 'batt_v', 'missed_payloads', 'esr0', 'esr1', 'esr2', 'esr3']
Feature importances:  [0.0784046  0.1871789  0.09478354 0.12096443 0.08418511 0.13707296
 0.29741046]
Model Score: 0.820931
F1 Score with macro averaging:0.7764073908388418
F1 Score with micro averaging:0.8209308218599543
F1 Score with weighted averaging:0.8360332235994649
Precision, Recall and FScore
Precision: 0.8209308218599544
Recall:0.8209308218599544
FScore:0.8209308218599543
All sensors in dataset:
['48-98-D3' '48-95-80' '48-9B-19' '48-97-92' '48-9A-65' '48-96-F9'
 '48-9D-84' '48-9B-4C' '48-95-B8' '48-9C-B0' '48-96-8F' '48-92-CB'
 '48-95-97' '48-A9-8D' '48-96-1C' '21-BE-36' '48-94-3C' '48-94-3D'
 '49-20-4E' '48-BF-FC' '48-92-42' '48-98-4F' '48-93-41' '22-02-9F'
 '48-97-9A' '48-BE-C1' '48-9C-BF' '48-92-D7' '48-C0-70' '48-9D-47'
 '22-06-96' '21-C5-A9' '48-C0-53' '48-95-A3' '48-96-5A' '48-BE-C2'
 '48-93-50' '22-02-75' '48-9A-92' '48-9C-0C' '22-02-9D' '48-9A-58'
 '21-C9-EE' '48-93-38' '48-96-0B' '48-92-A1' '48-BE-C9' '48-99-2F'
 '22-02-9E' '48-BE-B4' '48-92-AC' '48-96-4C' '48-94-3F' '48-97-D1'
 '48-98-59' '48-97-BA' '48-BE-63' '21-C5-AB' '48-9D-08' '21-BD-F9'
 '48-95-C2' '48-97-12' '21-C5-B0' '21-C4-35' '48-94-23' '48-9D-72'
 '21-C9-D9' '49-3F-F2' '48-9A-7E' '48-94-12' '21-C3-26' '22-02-C7'
 '21-C9-FF' '48-94-47' '48-9C-AF' '48-A9-A2' '48-92-EE' '21-C9-F0'
 '48-9B-9A' '48-94-08' '48-9B-50' '22-06-A7' '48-93-FA' '48-96-6F'
 '48-95-1E' '48-9A-E1' '48-9C-2A' '48-BE-E3' '22-02-A4' '48-98-69'
 '48-94-B1' '22-02-CA' '22-04-BD' '48-92-95' '48-9B-06' '22-02-86'
 '48-9B-1B' '22-02-9C' '21-C4-4C' '21-C2-46' '48-BE-7F']
Sensors that were still alive after 9/28 reboot event:
['22-02-86' '48-BE-63' '48-92-A1' '48-95-1E' '48-BE-C9' '48-9A-7E'
 '48-BF-FC' '21-C5-B0' '48-9D-47' '48-C0-53' '48-9D-84' '22-02-A4'
 '21-C4-4C' '48-95-A3' '21-C3-26' '22-02-C7' '48-BE-C1' '48-92-D7'
 '48-BE-7F' '48-9B-4C' '22-06-96' '22-02-9F' '21-C2-46' '22-02-CA'
 '48-9A-92' '22-02-9D' '22-06-A7' '49-20-4E' '21-BD-F9' '48-BE-B4'
 '48-9B-19' '48-A9-8D' '48-94-47' '48-BE-E3' '48-9C-B0' '22-04-BD'
 '21-C5-AB' '48-92-42' '21-C4-35' '48-97-D1' '21-C9-D9' '21-C9-FF'
 '48-9B-9A' '48-BE-C2' '48-94-12' '22-02-9C' '48-93-38' '48-94-3C'
 '22-02-9E' '48-9C-2A' '48-9A-65' '22-02-75' '48-C0-70' '21-C5-A9'
 '48-9D-08' '48-9D-72' '21-BE-36' '21-C9-EE' '21-C9-F0' '49-3F-F2'
 '48-95-C2' '48-A9-A2' '48-96-F9']
Sensors that may possibly die in future:
['48-92-A1' '48-95-1E' '48-9A-7E' '48-92-D7' '48-9B-4C' '48-9B-19'
 '48-9C-B0' '48-97-D1' '48-9B-9A' '48-94-3C' '48-9C-2A' '48-9A-92'
 '48-96-F9' '48-92-42' '48-9A-65' '48-94-12' '48-95-A3' '48-95-C2'
 '48-93-38' '48-94-47']

Conclusion

I hope you have enjoyed reading this blog post. You can get the entire code as a single file. A lot of the work depends on knowing the quirks of Pandas and handling data using the Pandas functions. Most people, including myself, think in SQL first and then attempt to find out the Pandas way of doing the same job. I need to keep Google handy for doing searches as I progress through the code. Since I have shown you a lot of the common use-cases, you can take hints from this code and proceed with your own work, using some of the boiler-plate code. If you like this article or find an error in the code, or have an alternate opinion about the approach, you are most welcome to leave a comment below.