Today many non-profit organizations are dependent on social media platforms to recruit volunteers for their events. This often leads to a lot of manual work using spreadsheets which is time consuming and hard to manage on a large volume of interest in a short time span. And since there is no database on the organization end, data is often lost and they can’t keep track of returning volunteers. Also the manual process is an obstacle to planning and preparation. On top of that, there is no way of confirming further updates reaching the volunteers because social posts often gets lost in the sea of notifications and reaching out to everyone on their personal inbox is not feasible unless there is a software to do that.
This project is a distributed event-driven application that provides real-time intent classification for recruiting volunteers from social media platforms and keeping a channel of communication with them.
The above diagram depicts the flows between the system's parts:
- Nonprofit Admin creates a "signup" post
- Voluntree enqueues a task
- A worker picks up the task
- Worker publishes the post in Facebook
- Facebook user creates a comment
- Voluntree receives the comment with a webhook callback
- The webhook callback enqueues a preprocessing task
- A worker picks it up and turns the comment into an embedding (requires Disk IO)
- Worker puts a message (the embedding and the original comment) in a stream
- A gear reads messages from the stream (in batch)
- Gear triggers a Model run for the entire batch of messages in Redis AI
- Redis AI detects whether the commenter is interested in volunteering
- Gear writes the message (model run output and the original comment) to another stream
- A worker polls the stream for new comments
- It writes them down in database (in bulk)
- It sends appropriate reply to the FB user
- It writes message to the correct channel so that consumers listening to the group gets update in real time
- Nonprofit Admin see signups in real-time via WebSocket updates
Along with usage as cache and some other Redis data structures, we used:
- Lists (for task queue) and Pub/Sub (as backend for distributed real-time communication)
Lists and Pub/Sub
We use Celery which is a distributed task queue for both real-time processing and task scheduling. Celery communicates via messages, using a broker to mediate between clients and workers. We are using Redis for message transport support for Celery. This not only takes advantage of Lists for implementing task queues but also pattern-based Pub/Sub architecture to coordinate between workers with some other common data types like Sets.
We need several queues for this:
- Publishing Queue (posts a status to Facebook)
- Comment Preprocessing Queue (converts a comment (i.e. text) to a Tensor (i.e. numbers))
- Reply Queue (posts a reply or private message to a Facebook user)
- Notification Queue (sends private message to all signed up Facebook users for an event update)
- DB Write Queue (writes new comments from a stream to our Database in bulk)
- Email Queue (sends verification email before creating any 3rd party integration account NEW!)
We use Django Channels which allows Django projects to handle not only HTTP, but protocols that require long-running connections - in our case, the WebSockets. It bundles an event-driven architecture with “channel layers”, a system that allows us to easily communicate between processes.
channels_redis is the only official Django-maintained channel layer supported for production use. The layer uses Redis as its backing store and leverages Sorted Sets, Lists etc. Right now we only have one group of consumers that listen to when a new signup is generated, but the list will grow bigger very soon as we implement more real time features.
These use cases are useful parts of making a distributed real-time application.
Hands down, it was well and truly the “Thor’s stormbreaker for deep learning deployment”.
We defined our machine learning model using TensorFlow. Once we were happy with the results we "froze" the model to a
frozen_graph.pb file using the following piece of code:
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2 # Save model to SavedModel format tf.saved_model.save(model, "tmp/models") # Convert Keras model to ConcreteFunction full_model = tf.function(lambda x: model(x)) full_model = full_model.get_concrete_function( tf.TensorSpec(model.inputs.shape, model.inputs.dtype)) # Get frozen ConcreteFunction frozen_func = convert_variables_to_constants_v2(full_model) frozen_func.graph.as_graph_def() layers = [op.name for op in frozen_func.graph.get_operations()] # Save frozen graph from frozen ConcreteFunction to hard drive tf.io.write_graph(graph_or_graph_def=frozen_func.graph, logdir="tmp/frozen_models", name="frozen_graph.pb", as_text=False)
We struggled with this piece particularly because we couldn't find any documentation on freezing graphs in TensorFlow 2.1x.
Then we created a script that would load the model into the memory. We created another script to test whether the model was producing the correct response since the frozen graph will be executed using the TensorFlow 1.15 backend whereas it was initially developed with TensorFlow 2.1x. In that script we made use of the RedisAI commands:
def model_run(embedding): MODEL = 'model:interestnet' INPUT = 'tensor:input-interestnet' OUTPUT = 'tensor:output-interestnet' res = conn.tensorset(INPUT, embedding['embeding'], shape=(1, 44), dtype='float') res = conn.modelrun(MODEL, inputs=[INPUT], outputs=[OUTPUT]) res = conn.tensorget(OUTPUT, as_numpy=True) index = np.argmax(res) res = True if index == target_class else False return res
When we were satisfied with results we integrated the Redis Gears to take advantage of the C interface to connect to RedisAI.
Our "Comment Preprocessing Queue" acts as producer and adds a preprocessed comment (as a list of numbers) to a Redis Stream. The new comment triggers the execution of a RedisGear. This gear prepares the input to the model's requirements (i.e. creates a Tensor from the lists of numbers it consumed from the stream) and calls Redis AI to execute an intent classification model on the comment. Then it stores the model's outputs (i.e. whether the intent of the comment is to sign up as a volunteer) in another Redis Stream.
Interfacing Redis Gears with Redis AI was the most channeling integration. There was very little documentation and only a few examples on how to do this integration. The examples all used a library named redisAI and we couldn’t find it anywhere on the internet. After much struggle we learned that this library is automatically made available in the Redis Gear runtime python environment and that is a C language interface to the RedisAI. Also the module itself doesn’t
have any public documentation. So we had to read through the open source code of RedisGear line by line to find how to achieve something that we want to achieve.
All the examples on the internet worked with Blobs (
createTensorFromBlob) as model input, but in our case it was a list of floating numbers. We tried to follow the Blob examples but found out that executing a TensorFlow model on Redis Gear seems to be yielding wrong and different outputs than those of Redis CLI. Whereas the CLI matches the output running on native TensorFlow. This resulted in a discussion on Redis Community. Meanwhile we looked through the source code and found out there was another API named
createTensorFromValues which would suit our use case better but it had a bug in it. We found some issues on that particular function and immediately issued a PR to solve it. @meirsh from Redis team was kind enough to review and merge the PR at very late hours of night. After all this hassle we finally were able to get a gear up and running that would listen to a stream for input and run it through the model to decide on an outcome. This is a sample working version of code that we ended up with:
from redisgears import executeCommand as execute import redisAI def xlog(*args): execute('XADD', 'logs', '*', 'msg', ' '.join(map(str, args))) def model_run(): input = [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1] input_tensor = redisAI.createTensorFromValues('FLOAT', [1, 44], iter(input)) runner = redisAI.createModelRunner('mymodel') redisAI.modelRunnerAddInput(runner, 'x', input_tensor) redisAI.modelRunnerAddOutput(runner, 'Identity') model_replies = redisAI.modelRunnerRun(runner) model_output = redisAI.tensorToFlatList(model_replies) xlog("model_run model_output", model_output) model_run() GearsBuilder().run()
We use two main streams (other than some log and minor use cases).
Preprocessed comments are put into this stream by a worker and Redis Gears reads messages from this one in batch. Then it feeds the entire batch into a
machine learning model which generates outputs in batch. The output is written to a second stream
Redis Gears write out the positive outcomes (i.e. the comments where people expressed interests to sign up as volunteers). Another worker reads from this stream and writes the comments in the database in a bulk. It also initiates relevant tasks like sending updates to the channel layer, sending reply/message to Facebook commenter etc.