from redis import Redis
import redis.exceptions as exceptions
import logging
import time
import threading
logging.basicConfig(level=logging.INFO)
def writeMessage(streamName):
"""Starts a loop writing the current time and thread name to 'streamName'
:param streamName: Stream (key) name to write messages.
"""
fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None}
while True:
fieldsDict['myvalue'] = time.ctime(time.time())
redis.xadd(streamName,fieldsDict)
time.sleep(1)
def readMessage(groupName=None,streamName=None):
"""Starts a loop reading from 'streamName'
Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution.
Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread.
:param groupName: stream group were multiple threads will read.
:param streamName: Stream (key) name where messages will be read.
"""
readerID=threading.currentThread().getName()
while True:
try:
# Check if the stream has any message
if redis.xlen(streamName)>0:
# Check if if the messages are new (not acknowledged) or not (already processed)
streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1)
if len(streamData) > 0:
msgId,message = streamData[0][1][0]
logging.info("{}: Got {} from ID {}".format(readerID,message,msgId))
#Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message.
redis.xack(streamName,groupName,msgId)
logging.info("Stream message ID {} read and successfuly by {}".format(msgId,readerID))
redis.xdel(streamName,msgId)
else:
pass
except:
raise
time.sleep(0.5)
# Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read.
try:
redis.xgroup_create('mystream','myworkergroup',mkstream=True)
except exceptions.ResponseError as e:
logging.info("Consumer group already exists. Will continue despite the error: {}".format(e))
except:
raise
# Starts 5 writer threads.
for writer_no in range(5):
writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True)
writerThread.start()
# Starts 10 reader threads
for reader_no in range(10):
readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True)
readerThread.daemon = True
readerThread.start()
# Keep the code running for 30 seconds
time.sleep(30)
python ReadWriteStream.py
If the program cannot run, use the following command:
python "ReadWriteStream.py file path"