Building Redundancy Into Data Gathering for My Stock Data API

In a previous post, I wrote about an API I have created to retrieve data for stocks belonging to the Russell 2000 and S&P 500 indexes. One glaringly obvious issue with the data-gathering techniques discussed in that post is that the process relies on the system currently running the Python script to be running and connected to the internet to fetch the data. This shouldn’t be an issue most of the time. The script currently runs on a Raspberry Pi Model 3 that should have no issues running for long periods of time. However, you can’t be too careful, especially in situations like this where a loss of connectivity for even a few minutes can cause problems in the application (e.g. missing chunks of data in the API).

Although I haven’t encountered this issue yet it has been in the back of my mind so I’ve created another Python script designed to run every minute (via cron) to ensure the data collection machine is up and running. The advantage of my particular approach is that this method is extendable to multiple machines. In fact, I have the script running every minute on two different machines to have a second line of defense against the dedicated hardware going offline. The primary disadvantage is that it doesn’t fill in the gaps when the data-gathering Python script simply fails to run. Due to inconsistencies in the runtime of the script, I’m still working out a good way to determine if the script has successfully completed and, if not, insert the data (albeit a little later than expected).

In this post, I will discuss the simple script I’ve created to verify the machine chosen to collect the data is running, and if it’s not schedule another machine to collect the data. I will also demonstrate how this can be extended to multiple machines to decrease the odds of missing data in the API. The technique developed in this post can be used to add redundancy to any mission-critical system that relies on the uptime and/or connectivity of a particular machine. The only pre-requisite is that the machine is connected to the internet and listening on some port (I use port 22 since most of my machines are headless and SSH is how I manage them). That being said, I’m sure there are different ways of doing this (maybe a good solution exists with Docker/Kubernetes) but this solution is quick, easy, effective, and doesn’t require the ‘dockerization’ of your service.

Determining if a Machine is Running via sockets

As mentioned above, most of my machines are headless and managed via SSH so finding a socket that was open and accepting connections was easy for me. If this isn’t the case for you it’s very easy to create your own server via Python (or almost any language really) to listen for connections on a specified port. The fact that the machines are always listening for connections can be exploited to determine if they are powered on and connected to the network (which is required to fetch and save the stock data).

The Python logic below is used to attempt to connect to a particular socket. If the connection fails an error will be thrown indicating that the machine is offline (or simply not listening on that port). In my case, I am assuming that the SSH server will always be running.

import socket as sock
s = sock.socket(sock.AF_INET, sock.SOCK_STREAM)

try:
    s.connect((host, 22))
    with open(logfile, 'a') as f:
        f.write('\tSuccessfully connected to ' + host + ' disabling cron job')
        f.write('\n')
except:
    with open(logfile, 'a') as f:
        f.write('\tUnable to connect to ' + host + ' scheduling cron job')
        f.write('\n')

So far, all we’re doing is attempting to connect to a remote machine and writing to a log file whether or not this was successful.

Using python-crontab to Enable and Disable the cronjob

Since the API data fetching script runs as a cron job every 3 minutes I was looking for a way to programmatically enable or disable a cron job, i.e. comment/uncomment the cron job line. Fortunately, there is a Python library that does that and has a slew of other extremely useful functionality for dealing with cron jobs: python-crontab. For this project, all that needs to be done is finding the job used for fetching the stock data and either enabling or disabling the job. This can be done with the Python code below.

# fetch_data.py is the name of the Python script used to fetch the data.
# this will select all jobs with 'fetch_data.py' in the crontab line.
command = 'fetch_data.py'
cron = CronTab(user='root') # crontab for root user

jobs = cron.find_command(command)
enable = False

for job in jobs:
    job.enable(enable)
job.minute.every(3) # just in case
cron.write() # save changes 

Putting it Together

Smashing these two pieces together gives a basic script that can be used to determine if there are any machines currently fetching data for the stock data API and, if there’s not, enable a cronjob to ensure there are no gaps in the data. Additionally, we don’t want the same data to be inserted twice at slightly different times (seconds or milliseconds apart). Therefore, the job needs to be disabled if a different node comes back online, especially one that doesn’t check for the connectivity of the other nodes.

The Python script below will disable/enable a cron job used to fetch data for the stock data API if other nodes in the network are offline.

from crontab import CronTab
import socket as sock
import datetime 

# which hosts to check before we run the process. For first 
# line of defense we will check one host. For second node we
# will check the first two, etc.
hosts = ['node1']   # hostnames of the hosts to check (can be IP addresses)
logfile='/path/to/log.dat'
command = 'fetch_data.py'
cron = CronTab(user='root')

# log that we're starting up the script
with open(logfile, 'a') as f:
    f.write("Starting check script on {}\n".format(str(datetime.datetime.now().replace(microsecond=0))))

s = sock.socket(sock.AF_INET, sock.SOCK_STREAM)
jobs = cron.find_command(command)

enable = False
success = False
for host in hosts:
    if success:  # another node was online, stop processing the loop
        break
    try:
        s.connect((host, 22))
        with open(logfile, 'a') as f:
            f.write('\tSuccessfully connected to ' + host + ' disabling cron job')
            f.write('\n')
        enable = False
        success = True
    except:
        with open(logfile, 'a') as f:
            f.write('\tUnable to connect to ' + host + ' scheduling cron job')
            f.write('\n')
        enable = True

# enable or disable the job depending on if other nodes are online
for job in jobs:
    job.enable(enable)
job.minute.every(3)
cron.write()

# log that we're done
with open(logfile, 'a') as f:
    f.write("Ending check script on {}\n\n".format(str(datetime.datetime.now().replace(microsecond=0))))

This script is run every minute to ensure the data is being fetched.

Two main problems still persist for validating that the data is being gathered correctly: the data-gathering script not running correctly and my home network going down. I’m still working on solutions to circumvent these issues. The best (maybe only) way to prevent the latter is to have the script running on a device that doesn’t rely on my home internet connection and for the former, I will probably develop some logic to determine if the data is being inserted at the right frequency.

Expanding Redundancy to Multiple Machines

The logic above can be expanded to include more nodes. For my purposes, I choose to use a cascading type of redundancy where a primary node is used to fetch the data and if that node goes offline a second device is used to fetch the data via the script above. Now if the second device goes offline, a third device is setup to determine if any of the other nodes are running and to enable a cron job if not. To add more devices in this fashion each node would check for connectivity of every previous node but not the nodes further downstream. This can be done by adding IP’s or hostnames to the hosts list in the code above.

from crontab import CronTab
import socket as sock
import datetime 

# which hosts to check before we run the process. For first 
# line of defense we will check one host. For second node we
# will check the first two, etc.
hosts = ['node1', 'node2']

Full Code

from crontab import CronTab
import socket as sock
import datetime

# which hosts to check before we run the process. For first
# line of defense we will check one host. For second node we
# will check the first two, etc.
hosts = ['node1', 'node2']
logfile='/home/amorast/Documents/backup_api/log.dat'
command = 'fetch_data.py'
cron = CronTab(user='root')

with open(logfile, 'a') as f:
    f.write("Starting check script on {}\n".format(str(datetime.datetime.now().replace(microsecond=0))))

s = sock.socket(sock.AF_INET, sock.SOCK_STREAM)
jobs = cron.find_command(command)

enable = False
success = False
for host in hosts:
    if success:
        break
    try:
        s.connect((host, 22))
        with open(logfile, 'a') as f:
            f.write('\tSuccessfully connected to ' + host + ' disabling cron job')
            f.write('\n')
        enable = False
        success = True
    except:
        with open(logfile, 'a') as f:
            f.write('\tUnable to connect to ' + host + ' scheduling cron job')
            f.write('\n')
        enable = True

for job in jobs:
    job.enable(enable)
job.minute.every(3)
cron.write()


with open(logfile, 'a') as f:
    f.write("Ending check script on {}\n\n".format(str(datetime.datetime.now().replace(microsecond=0))))

Leave a Reply

Your email address will not be published. Required fields are marked *