Friday, September 2, 2016

CURIO and Websocketd

Quote from site :

Curio is a library for performing concurrent I/O and common system programming tasks such as launching subprocesses and farming work out to thread and process pools. It uses Python coroutines and the explicit async/await syntax introduced in Python 3.5. Its programming model is based on cooperative multitasking and existing programming abstractions such as threads, sockets, files, subprocesses, locks, and queues. You'll find it to be small and fast.

Full duplex messaging between web browsers and servers

run the bellow python script '' with the following command :
./websocketd --port=8080 python

In an html page put the folowing code
and in the web browser console enter for example: ws.send('ls /home')

<!DOCTYPE html>
    <title>websocketd unix command example</title>
      #count {
        font:  16px arial;
        margin: auto;
        color: #00FF00;
    <div style="color: #fff;">Python Terminal</div>
    <div id="count" style="height:500px; overflow:auto; border:1px solid black;"></div>

      data = []

      function clearDiv() {
        var div = document.getElementById('count');
        div.innerHTML = "";

      var ws = new WebSocket('ws://localhost:8080'  + "/");
      data = []
      ws.onopen = function() { = '#000';
      ws.onclose = function() { = null;
      ws.onmessage = function(event) {
        data = []
        document.getElementById('count').innerHTML = data[0];

</html> (modified version from the examples site)
import curio
from sys import stdout
from curio import subprocess

async def main():
    while True:
        data = b""
        path = input()
        p = subprocess.Popen(["ls", "-rtl", path], stdout=subprocess.PIPE, bufsize=1)
        async for line in p.stdout:
            data += line
        print(data.decode('UTF-8').replace('\n','<br />'))

Monday, June 27, 2016

GFK - Memory

Fist game draft : Memory.

This is the first game created with Transcrypt and Hexi.
Feel free to comment, if you want precisions on installation.
# This file is translated to Javascript using Transcrypt
rgb2hex = JS.rgb2hex
color1 = rgb2hex("rgba(255,0,0,0)")
color2 = rgb2hex("rgba(255,255,0,0)")
color3 = rgb2hex("rgba(255,255,255,0)")
color4 = rgb2hex("rgba(255,0,255,0)")
color5 = rgb2hex("rgba(255,80,80,0)")
color6 = rgb2hex("rgba(255,128,0,0)")
color7 = rgb2hex("rgba(255,128,255,0)")
color8 = rgb2hex("rgba(255,0,128,0)")

colors = [color1,color2, color3, color4, color5, color6, color7, color8]
allcolors = [color for  tuplecolor in zip(colors, colors) for color in tuplecolor]

# Helper Functions:
def all(iterable):
    for element in iterable:
        if not element:
            return False
    return True

# Main Grid wich contents all the cells
# Grille principale contenant les cellules
class Grid:
    def __init__(self, game, rows=4,  cols=4):
        self.offset = 4 = game
        self.rows = rows
        self.cols = rows
        self.ligs = [[0 for i in range(self.rows)] for j in range(self.cols)]
        self.spr = [[0 for i in range(self.rows)] for j in range(self.cols)]

    def  display(self):
        numcells = self.rows * self.cols
        cells = range(16)
        for num in cells:
            color = allcolors[num]
            i, j = num % self.cols, num // self.rows
            posx, posy  = i *(128  + self.offset ), j * (128  + self.offset ) 

            # FrontFace : Sprite or Color
            sprite =, 128, color)
            sprite.x = posx
            sprite.y = posy
            sprite.num = num
            sprite.content  = color
            sprite.showed = False

            # Backface 
            rectb =, 128, "blue")
            rectb.x = posx
            rectb.y = posy
            rectb.num = num
            self.ligs[i][j] = rectb
            self.spr[i][j] = sprite

# Game Handler
# Gestion du Jeu                                     
class Memory:
    def __init__(self, width=512, height=512): = hexi(width, height, self.setup) = "seagrean"
        self.mouse =
        self.mouse.tap = self.tap
        self.grid = Grid(
        self.curcell = None
        self.clickedcells = []

    def tap(self):
        self.tapped = True

    def setup(self):       "seaGreen")
        self.grid.display() =

    def get_curcell(self):
        for i in range(self.grid.rows):
            for j in range(self.grid.cols):
                curcell = self.grid.ligs[i][j]
                if(, curcell)):
                    self.curcell = curcell      

    def compare_cells(self):

        if len(self.clickedcells) < 2:

        numrows = self.grid.rows
        numcols = self.grid.cols 

        def resetcell(cells):
            def _reset():
                cells[0].alpha = 1
                cells[1].alpha = 1
                self.clickedcells = []
            return _reset 

        cella, cellb = self.clickedcells[:2]
        if (cella.num != cellb.num ):                
                icella, jcella = cella.num % numcols, cella.num // numrows
                icellb, jcellb = cellb.num % numcols, cellb.num // numrows
                spritea = self.grid.spr[icella][jcella]
                spriteb  = self.grid.spr[icellb][jcellb]
                contenta = spritea.content 
                contentb = spriteb.content                     
                cellb.alpha = 0
                cella.alpha = 0                     
                if (contenta != contentb):
                    setTimeout(resetcell([cella, cellb]), 1000)
                    spritea.showed = True
                    spriteb.showed = True
                    self.clickedcells = []

    def check_endgame(self):
        # flatten liste sprites
        # On 'applatit la liste des sprites'
        lst_spr = [sprite for liste_sprites in self.grid.spr for sprite in liste_sprites]
        showed_values = [s.showed for s in lst_spr]
        if (all(showed_values)):
            for s in lst_spr:
                s.alpha = 0
   = self.end 

    def play(self):

        if (self.mouse.tapped): 
            lc = len(self.clickedcells)
            if (lc >= 2):
                alert("TOO FAST")
            self.mouse.tapped = False

    def start(self):

    def end(self):

memory = Memory()

Games For Kids - GFK

I have just created a new repository named GFK like Games For Kids on Github.

Transcrypt is translate Python to Javascript. The Javascript resulting is clean and readable.
Hexi is Game library based on PixiJS

Saturday, October 24, 2015

Running multiple python apps with nginx and uwsgi in emperor mode

Quote from site

(...)This is a recipe on how to easily run multiple Python web applications using uwsgi server (in emperor mode) and behind nginx. Most existing docs and blogs would show how to manually start uwsgi to run a single app. In this post, I'll show how to configure uwsgi as a system service (with upstart) capable of serving multiple python WSGI compliant web applications by simply placing them in a standard location and adding an standard xml file.(...)

Wednesday, October 14, 2015

Turn a function in asynchrounous mode
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from threading import Thread
from time import sleep

class async(Thread):

    def __init__(self, callback, errback):
        super(async, self).__init__()
        self.callback = callback
        self.errback = errback

    def __call__(self, func):
        # à l'appel de la fonction, on récupère juste la fonction
        # et ses arguments, et on lance notre thread
        def wrapper(*args, **kwargs):
            self.func = func
            self.args = args
            self.kwargs = kwargs
        return wrapper

    def run(self):
            retval = self.func(*self.args, **self.kwargs)
        except Exception as err:

def my_callback(retval):
    print "CALLBACK:", retval
    # ici on peut faire des opérations supplémentaires
    # sur la valeur renvoyée par la fonction.
    # ce code est appelé par le thread, il ne peut donc
    # pas bloquer notre thread principal

def my_errback(err):
    print "ERRBACK", err
    # ici on peut re-lancer l'exception, ou simplement
    # l'ignorer. Ce code est également appelé dans le thread

@async(my_callback, my_errback)
def ma_fonction_reussie(n):
    return n

@async(my_callback, my_errback)
def ma_fonction_foireuse(n):
    raise AttributeError("Got %s, expected 'foo'" % n)


# histoire de passer le temps pendant ce temps là...
for i in range(10):
    print "MAIN:", i

Remote command with Paramiko-Expect
#!/usr/bin/env python
# PyNet Class Exercises by Nick Ellson
__author__ = "Nick Ellson"
import paramiko
from paramikoe import SSHClientInteraction
import getpass

# Collect an target, user, and password. This example assumes that privilege level 15 is on your Cisco VTY.
ip = input("Enter Host: ")
username = input("Enter Username: ")
password = getpass.getpass()

#Initialize teh Paramiko connection
remote_conn_pre = paramiko.SSHClient()
remote_conn_pre.connect(ip, username=username, password=password, allow_agent=False, look_for_keys=False)
print ("SSH connection established to %s" % ip)

# Here we make a simple Paramiko only shell because we do not yet have a prompt example to work with for Expect.
# So this will basically log us in, and grab that prompt that we land with, and disconnect.
remote_conn = remote_conn_pre.invoke_shell()   
output = remote_conn.recv(1000).decode("utf-8")
prompt = output.strip()

# Now we reconnect with the paramiko expect module and we have a nice interactive conversation.
remote_conn_pre.connect(ip, username=username, password=password, allow_agent=False, look_for_keys=False)
interact = SSHClientInteraction(remote_conn_pre, timeout=20, display=False)

#Logged in, wait for the prompt to display.
#Shut off the --more-- prompting
interact.send('terminal length 0')
#We don't care about the output for that last command, so just clear the buffer.
cmd_output = interact.current_output_clean
#Let's grab something BIG that use dto be a real timing issue the old way, not knowing how long it might take to get a full 
# running config over a WAN link. Now we do not care, we will simply wait to the prompt shows up. 
interact.send('show running-config')
cmd_output = interact.current_output_clean
print (cmd_output)

#Close our session

Monday, October 12, 2015

Spnner - AsyncIO example-

Nice example using asyncio in Python3.5

# credits: Example by Luciano Ramalho inspired by
# Michele Simionato's multiprocessing example in the python-list:

import asyncio
import itertools
import sys

async def spin(msg):  # <1>
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write('\x08' * len(status))
            await asyncio.sleep(.1)  # <3>
        except asyncio.CancelledError:  # <4>
    write(' ' * len(status) + '\x08' * len(status))

async def slow_function():  # <5>
    # pretend waiting a long time for I/O
    #await asyncio.sleep(3)  # <6>
    for i in range(100000000):
        i += 1
        await asyncio.sleep(.0000001)
    return 42

async def supervisor():  # <7>
    spinner = asyncio.ensure_future(spin('thinking!'))  # <8>
    print('spinner object:', spinner)  # <9>
    result = await slow_function()  # <10>
    spinner.cancel()  # <11>
    return result

def main():
    loop = asyncio.get_event_loop()  # <12>
    result = loop.run_until_complete(supervisor())  # <13>
    print('Answer:', result)

if __name__ == '__main__':

Friday, September 25, 2015

Reading large GZIP files with Python

Thanks to
Spencer Rathbun

Here Sepencer's way to read, large files with Python, amazingly I have adopted nearly, the same strategy for one of my scripts at work...thanks to Dave Beazley for its fantastic  job on genrators in Python

#!/usr/bin/env python
import gzip, bz2
import os
import fnmatch

def gen_find(filepat,top):
    for path, dirlist, filelist in os.walk(top):
        for name in fnmatch.filter(filelist,filepat):
            yield os.path.join(path,name)

def gen_open(filenames):
    for name in filenames:
        if name.endswith(".gz"):
        elif name.endswith(".bz2"):
            yield bz2.BZ2File(name)
            yield open(name)

def gen_cat(sources):
    for s in sources:
        for item in s:
            yield item

def main(regex, searchDir):
    fileNames = gen_find(regex,searchDir)
    fileHandles = gen_open(fileNames)
    fileLines = gen_cat(fileHandles)
    for line in fileLines:
        print line

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Search globbed files line by line', version='%(prog)s 1.0')
    parser.add_argument('regex', type=str, default='*', help='Regular expression')
    parser.add_argument('searchDir', , type=str, default='.', help='list of input files')
    args = parser.parse_args()
    main(args.regex, args.searchDir)