pypelined - stream and pipeline processing service

Documentation Status

Documentation Topics Overview:

The pypelined service and framework lets you build and deploy iterative processing pipelines. Using generator/coroutines with the chainlet library, it is trivial to create pipelines to fetch, process and transform streams of data. Configuration files are written using pure Python, allowing for maximum customization:

# this is a pure python configuration file
from chainlet import funclet, filterlet
from pypelined.conf import pipelines

# new pipeline processing element as simple python function
@funclet
def add_time(chunk):
    chunk['tme'] = time.time()
    return chunk

# new pipeline receiving process monitoring reports, modifying them, and sending them to another service
process_chain = Socket(10331) >> decode_json() >> filterlet(lambda value: value.get('rcode') == 0) >> \
    add_time() >> Telegraf(address=('localhost', 10332), name='valid_processes')
# add pipeline for deployment
pipelines.append(process_chain)
python -m pypelined myconfig.py

Contributing and Feedback

The project is hosted on github. If you have issues or suggestion, check the issue tracker: Open Issues For direct contributions, feel free to fork the development branch and open a pull request.

Indices and tables


Documentation built from chainlet 0.1.3 at Nov 15, 2017.