pypelined - stream and pipeline processing service¶
The pypelined
service and framework lets you build and deploy iterative processing pipelines.
Using generator/coroutines with the chainlet
library, it is trivial to create pipelines to fetch, process and transform streams of data.
Configuration files are written using pure Python, allowing for maximum customization:
# this is a pure python configuration file
from chainlet import funclet, filterlet
from pypelined.conf import pipelines
# new pipeline processing element as simple python function
@funclet
def add_time(chunk):
chunk['tme'] = time.time()
return chunk
# new pipeline receiving process monitoring reports, modifying them, and sending them to another service
process_chain = Socket(10331) >> decode_json() >> filterlet(lambda value: value.get('rcode') == 0) >> \
add_time() >> Telegraf(address=('localhost', 10332), name='valid_processes')
# add pipeline for deployment
pipelines.append(process_chain)
python -m pypelined myconfig.py
Contributing and Feedback¶
The project is hosted on github. If you have issues or suggestion, check the issue tracker: For direct contributions, feel free to fork the development branch and open a pull request.