import socket import thread class BufferOverload(Exception): """ raised when the buffer's size exceeds config.max_events_in_buffer """ class OutChannel(object): def __init__(self, config, httpchannel): self.config = config self.httpchannel = httpchannel self.buffer = [] self.lock = thread.allocate_lock() self.commands_pushed = 0 def push_command(self, command): self.lock.acquire() try: self._push_command(command) finally: self.lock.release() self.commands_pushed += 1 if self.commands_pushed >= self.config.max_events_per_request: self.httpchannel.close_when_done() def refresh_channel(self, httpchannel): self.httpchannel = httpchannel self.commands_pushed = 0 def _push_command(self, command): self.buffer.append(command) if len(self.buffer) > self.config.max_events_in_buffer: raise BufferOverload() while self.buffer: try: self.httpchannel.push_with_producer(self.buffer[0]) except socket.error: break else: self.buffer.pop(0)