Commit 7d146027 authored by Joel Martin's avatar Joel Martin

Pull websockify 284ef3cc1a54

Including HyBi-07 support and refactor of send/recv.
parent 5210330a
...@@ -5,6 +5,11 @@ Python WebSocket library with support for "wss://" encryption. ...@@ -5,6 +5,11 @@ Python WebSocket library with support for "wss://" encryption.
Copyright 2010 Joel Martin Copyright 2010 Joel Martin
Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3) Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3)
Supports following protocol versions:
- http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75
- http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
- http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-07
You can make a cert/key with openssl using: You can make a cert/key with openssl using:
openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem
as taken from http://docs.python.org/dev/library/ssl.html#certificates as taken from http://docs.python.org/dev/library/ssl.html#certificates
...@@ -17,9 +22,15 @@ from SimpleHTTPServer import SimpleHTTPRequestHandler ...@@ -17,9 +22,15 @@ from SimpleHTTPServer import SimpleHTTPRequestHandler
from cStringIO import StringIO from cStringIO import StringIO
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
try: try:
from hashlib import md5 from hashlib import md5, sha1
except:
# Support python 2.4
from md5 import md5
from sha import sha as sha1
try:
import numpy, ctypes
except: except:
from md5 import md5 # Support python 2.4 numpy = ctypes = None
from urlparse import urlsplit from urlparse import urlsplit
from cgi import parse_qsl from cgi import parse_qsl
...@@ -29,14 +40,22 @@ class WebSocketServer(object): ...@@ -29,14 +40,22 @@ class WebSocketServer(object):
Must be sub-classed with new_client method definition. Must be sub-classed with new_client method definition.
""" """
server_handshake = """HTTP/1.1 101 Web Socket Protocol Handshake\r buffer_size = 65536
server_handshake_hixie = """HTTP/1.1 101 Web Socket Protocol Handshake\r
Upgrade: WebSocket\r Upgrade: WebSocket\r
Connection: Upgrade\r Connection: Upgrade\r
%sWebSocket-Origin: %s\r %sWebSocket-Origin: %s\r
%sWebSocket-Location: %s://%s%s\r %sWebSocket-Location: %s://%s%s\r
%sWebSocket-Protocol: sample\r """
\r
%s""" server_handshake_hybi = """HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: %s\r
"""
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n""" policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
...@@ -54,7 +73,6 @@ Connection: Upgrade\r ...@@ -54,7 +73,6 @@ Connection: Upgrade\r
self.ssl_only = ssl_only self.ssl_only = ssl_only
self.daemon = daemon self.daemon = daemon
# Make paths settings absolute # Make paths settings absolute
self.cert = os.path.abspath(cert) self.cert = os.path.abspath(cert)
self.key = self.web = self.record = '' self.key = self.web = self.record = ''
...@@ -89,10 +107,10 @@ Connection: Upgrade\r ...@@ -89,10 +107,10 @@ Connection: Upgrade\r
# WebSocketServer static methods # WebSocketServer static methods
# #
@staticmethod @staticmethod
def daemonize(self, keepfd=None): def daemonize(keepfd=None, chdir='/'):
os.umask(0) os.umask(0)
if self.web: if chdir:
os.chdir(self.web) os.chdir(chdir)
else: else:
os.chdir('/') os.chdir('/')
os.setgid(os.getgid()) # relinquish elevations os.setgid(os.getgid()) # relinquish elevations
...@@ -124,18 +142,140 @@ Connection: Upgrade\r ...@@ -124,18 +142,140 @@ Connection: Upgrade\r
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
@staticmethod @staticmethod
def encode(buf): def encode_hybi(buf, opcode, base64=False):
""" Encode a WebSocket packet. """ """ Encode a HyBi style WebSocket frame.
buf = b64encode(buf) Optional opcode:
return "\x00%s\xff" % buf 0x0 - continuation
0x1 - text frame (base64 encode buf)
0x2 - binary frame (use raw buf)
0x8 - connection close
0x9 - ping
0xA - pong
"""
if base64:
buf = b64encode(buf)
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
payload_len = len(buf)
if payload_len <= 125:
header = struct.pack('>BB', b1, payload_len)
elif payload_len > 125 and payload_len <= 65536:
header = struct.pack('>BBH', b1, 126, payload_len)
elif payload_len >= 65536:
header = struct.pack('>BBQ', b1, 127, payload_len)
#print "Encoded: %s" % repr(header + buf)
return header + buf
@staticmethod @staticmethod
def decode(buf): def decode_hybi(buf, base64=False):
""" Decode WebSocket packets. """ """ Decode HyBi style WebSocket packets.
if buf.count('\xff') > 1: Returns:
return [b64decode(d[1:]) for d in buf.split('\xff')] {'fin' : 0_or_1,
'opcode' : number,
'mask' : 32_bit_number,
'length' : payload_bytes_number,
'payload' : decoded_buffer,
'left' : bytes_left_number,
'close_code' : number,
'close_reason' : string}
"""
ret = {'fin' : 0,
'opcode' : 0,
'mask' : 0,
'length' : 0,
'payload' : None,
'left' : 0,
'close_code' : None,
'close_reason' : None}
blen = len(buf)
ret['left'] = blen
header_len = 2
if blen < header_len:
return ret # Incomplete frame header
b1, b2 = struct.unpack_from(">BB", buf)
ret['opcode'] = b1 & 0x0f
ret['fin'] = (b1 & 0x80) >> 7
has_mask = (b2 & 0x80) >> 7
ret['length'] = b2 & 0x7f
if ret['length'] == 126:
header_len = 4
if blen < header_len:
return ret # Incomplete frame header
(ret['length'],) = struct.unpack_from('>xxH', buf)
elif ret['length'] == 127:
header_len = 10
if blen < header_len:
return ret # Incomplete frame header
(ret['length'],) = struct.unpack_from('>xxQ', buf)
full_len = header_len + has_mask * 4 + ret['length']
if blen < full_len: # Incomplete frame
return ret # Incomplete frame header
# Number of bytes that are part of the next frame(s)
ret['left'] = blen - full_len
# Process 1 frame
if has_mask:
# unmask payload
ret['mask'] = buf[header_len:header_len+4]
b = c = ''
if ret['length'] >= 4:
mask = numpy.frombuffer(buf, dtype=numpy.dtype('<L4'),
offset=header_len, count=1)
data = numpy.frombuffer(buf, dtype=numpy.dtype('<L4'),
offset=header_len + 4, count=int(ret['length'] / 4))
#b = numpy.bitwise_xor(data, mask).data
b = numpy.bitwise_xor(data, mask).tostring()
if ret['length'] % 4:
print "Partial unmask"
mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=header_len, count=(ret['length'] % 4))
data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=full_len - (ret['length'] % 4),
count=(ret['length'] % 4))
c = numpy.bitwise_xor(data, mask).tostring()
ret['payload'] = b + c
else: else:
return [b64decode(buf[1:-1])] print "Unmasked frame:", repr(buf)
ret['payload'] = buf[(header_len + has_mask * 4):full_len]
if base64 and ret['opcode'] in [1, 2]:
try:
ret['payload'] = b64decode(ret['payload'])
except:
print "Exception while b64decoding buffer:", repr(buf)
raise
if ret['opcode'] == 0x08:
if ret['length'] >= 2:
ret['close_code'] = struct.unpack_from(
">H", ret['payload'])
if ret['length'] > 3:
ret['close_reason'] = ret['payload'][2:]
return ret
@staticmethod
def encode_hixie(buf):
return "\x00" + b64encode(buf) + "\xff"
@staticmethod
def decode_hixie(buf):
end = buf.find('\xff')
return {'payload': b64decode(buf[1:end]),
'left': len(buf) - (end + 1)}
@staticmethod @staticmethod
def parse_handshake(handshake): def parse_handshake(handshake):
...@@ -160,7 +300,7 @@ Connection: Upgrade\r ...@@ -160,7 +300,7 @@ Connection: Upgrade\r
@staticmethod @staticmethod
def gen_md5(keys): def gen_md5(keys):
""" Generate hash value for WebSockets handshake v76. """ """ Generate hash value for WebSockets hixie-76. """
key1 = keys['Sec-WebSocket-Key1'] key1 = keys['Sec-WebSocket-Key1']
key2 = keys['Sec-WebSocket-Key2'] key2 = keys['Sec-WebSocket-Key2']
key3 = keys['key3'] key3 = keys['key3']
...@@ -171,7 +311,6 @@ Connection: Upgrade\r ...@@ -171,7 +311,6 @@ Connection: Upgrade\r
return md5(struct.pack('>II8s', num1, num2, key3)).digest() return md5(struct.pack('>II8s', num1, num2, key3)).digest()
# #
# WebSocketServer logging/output functions # WebSocketServer logging/output functions
# #
...@@ -195,6 +334,123 @@ Connection: Upgrade\r ...@@ -195,6 +334,123 @@ Connection: Upgrade\r
# #
# Main WebSocketServer methods # Main WebSocketServer methods
# #
def send_frames(self, bufs=None):
""" Encode and send WebSocket frames. Any frames already
queued will be sent first. If buf is not set then only queued
frames will be sent. Returns the number of pending frames that
could not be fully sent. If returned pending frames is greater
than 0, then the caller should call again when the socket is
ready. """
if bufs:
for buf in bufs:
if self.version.startswith("hybi"):
if self.base64:
self.send_parts.append(self.encode_hybi(buf,
opcode=1, base64=True))
else:
self.send_parts.append(self.encode_hybi(buf,
opcode=2, base64=False))
else:
self.send_parts.append(self.encode_hixie(buf))
while self.send_parts:
# Send pending frames
buf = self.send_parts.pop(0)
sent = self.client.send(buf)
if sent == len(buf):
self.traffic("<")
else:
self.traffic("<.")
self.send_parts.insert(0, buf[sent:])
break
return len(self.send_parts)
def recv_frames(self):
""" Receive and decode WebSocket frames.
Returns:
(bufs_list, closed_string)
"""
closed = False
bufs = []
buf = self.client.recv(self.buffer_size)
if len(buf) == 0:
closed = "Client closed abruptly"
return bufs, closed
if self.recv_part:
# Add partially received frames to current read buffer
buf = self.recv_part + buf
self.recv_part = None
while buf:
if self.version.startswith("hybi"):
frame = self.decode_hybi(buf, base64=self.base64)
#print "Received buf: %s, frame: %s" % (repr(buf), frame)
if frame['payload'] == None:
# Incomplete/partial frame
self.traffic("}.")
if frame['left'] > 0:
self.recv_part = buf[-frame['left']:]
break
else:
if frame['opcode'] == 0x8: # connection close
closed = "Client closed, reason: %s - %s" % (
frame['close_code'],
frame['close_reason'])
break
else:
if buf[0:2] == '\xff\x00':
closed = "Client sent orderly close frame"
break
elif buf[0:2] == '\x00\xff':
buf = buf[2:]
continue # No-op
elif buf.count('\xff') == 0:
# Partial frame
self.traffic("}.")
self.recv_part = buf
break
frame = self.decode_hixie(buf)
self.traffic("}")
bufs.append(frame['payload'])
if frame['left']:
buf = buf[-frame['left']:]
else:
buf = ''
return bufs, closed
def send_close(self, code=None, reason=''):
""" Send a WebSocket orderly close frame. """
if self.version.startswith("hybi"):
msg = ''
if code != None:
msg = struct.pack(">H%ds" % (len(reason)), code)
buf = self.encode_hybi(msg, opcode=0x08, base64=False)
self.client.send(buf)
elif self.version == "hixie-76":
buf = self.encode_hixie('\xff\x00')
self.client.send(buf)
# No orderly close for 75
def do_handshake(self, sock, address): def do_handshake(self, sock, address):
""" """
...@@ -222,7 +478,7 @@ Connection: Upgrade\r ...@@ -222,7 +478,7 @@ Connection: Upgrade\r
# Peek, but do not read the data so that we have a opportunity # Peek, but do not read the data so that we have a opportunity
# to SSL wrap the socket first # to SSL wrap the socket first
handshake = sock.recv(1024, socket.MSG_PEEK) handshake = sock.recv(1024, socket.MSG_PEEK)
#self.msg("Handshake [%s]" % repr(handshake)) self.msg("Handshake [%s]" % handshake)
if handshake == "": if handshake == "":
raise self.EClose("ignoring empty handshake") raise self.EClose("ignoring empty handshake")
...@@ -268,8 +524,9 @@ Connection: Upgrade\r ...@@ -268,8 +524,9 @@ Connection: Upgrade\r
raise self.EClose("Client closed during handshake") raise self.EClose("Client closed during handshake")
# Check for and handle normal web requests # Check for and handle normal web requests
if handshake.startswith('GET ') and \ if (handshake.startswith('GET ') and
handshake.find('Upgrade: WebSocket\r\n') == -1: handshake.find('Upgrade: WebSocket\r\n') == -1 and
handshake.find('Upgrade: websocket\r\n') == -1):
if not self.web: if not self.web:
raise self.EClose("Normal web request received but disallowed") raise self.EClose("Normal web request received but disallowed")
sh = SplitHTTPHandler(handshake, retsock, address) sh = SplitHTTPHandler(handshake, retsock, address)
...@@ -282,26 +539,73 @@ Connection: Upgrade\r ...@@ -282,26 +539,73 @@ Connection: Upgrade\r
#self.msg("handshake: " + repr(handshake)) #self.msg("handshake: " + repr(handshake))
# Parse client WebSockets handshake # Parse client WebSockets handshake
self.headers = self.parse_handshake(handshake) h = self.headers = self.parse_handshake(handshake)
prot = 'WebSocket-Protocol'
protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
ver = h.get('Sec-WebSocket-Version')
if ver:
# HyBi/IETF version of the protocol
if not numpy or not ctypes:
self.EClose("Python numpy and ctypes modules required for HyBi-07 or greater")
if ver == '7':
self.version = "hybi-07"
else:
raise self.EClose('Unsupported protocol version %s' % ver)
key = h['Sec-WebSocket-Key']
# Choose binary if client supports it
if 'binary' in protocols:
self.base64 = False
elif 'base64' in protocols:
self.base64 = True
else:
raise self.EClose("Client must support 'binary' or 'base64' protocol")
# Generate the hash value for the accept header
accept = b64encode(sha1(key + self.GUID).digest())
response = self.server_handshake_hybi % accept
if self.base64:
response += "Sec-WebSocket-Protocol: base64\r\n"
else:
response += "Sec-WebSocket-Protocol: binary\r\n"
response += "\r\n"
if self.headers.get('key3'):
trailer = self.gen_md5(self.headers)
pre = "Sec-"
ver = 76
else: else:
trailer = "" # Hixie version of the protocol (75 or 76)
pre = ""
ver = 75 if h.get('key3'):
trailer = self.gen_md5(h)
pre = "Sec-"
self.version = "hixie-76"
else:
trailer = ""
pre = ""
self.version = "hixie-75"
# We only support base64 in Hixie era
self.base64 = True
response = self.server_handshake_hixie % (pre,
h['Origin'], pre, scheme, h['Host'], h['path'])
if 'base64' in protocols:
response += "%sWebSocket-Protocol: base64\r\n" % pre
else:
self.msg("Warning: client does not report 'base64' protocol support")
response += "\r\n" + trailer
self.msg("%s: %s WebSocket connection (version %s)" self.msg("%s: %s WebSocket connection" % (address[0], stype))
% (address[0], stype, ver)) self.msg("%s: Version %s, base64: '%s'" % (address[0],
self.version, self.base64))
# Send server WebSockets handshake response # Send server WebSockets handshake response
response = self.server_handshake % (pre, self.msg("sending response [%s]" % response)
self.headers['Origin'], pre, scheme,
self.headers['Host'], self.headers['path'], pre,
trailer)
#self.msg("sending response:", repr(response))
retsock.send(response) retsock.send(response)
# Return the WebSockets socket which may be SSL wrapped # Return the WebSockets socket which may be SSL wrapped
...@@ -357,7 +661,7 @@ Connection: Upgrade\r ...@@ -357,7 +661,7 @@ Connection: Upgrade\r
lsock.listen(100) lsock.listen(100)
if self.daemon: if self.daemon:
self.daemonize(self, keepfd=lsock.fileno()) self.daemonize(keepfd=lsock.fileno(), chdir=self.web)
self.started() # Some things need to happen after daemonizing self.started() # Some things need to happen after daemonizing
...@@ -368,7 +672,8 @@ Connection: Upgrade\r ...@@ -368,7 +672,8 @@ Connection: Upgrade\r
while True: while True:
try: try:
try: try:
csock = startsock = None self.client = None
startsock = None
pid = err = 0 pid = err = 0
try: try:
...@@ -394,9 +699,14 @@ Connection: Upgrade\r ...@@ -394,9 +699,14 @@ Connection: Upgrade\r
pid = os.fork() pid = os.fork()
if pid == 0: if pid == 0:
# Initialize per client settings
self.send_parts = []
self.recv_part = None
self.base64 = False
# handler process # handler process
csock = self.do_handshake(startsock, address) self.client = self.do_handshake(
self.new_client(csock) startsock, address)
self.new_client()
else: else:
# parent process # parent process
self.handler_id += 1 self.handler_id += 1
...@@ -413,8 +723,8 @@ Connection: Upgrade\r ...@@ -413,8 +723,8 @@ Connection: Upgrade\r
self.msg(traceback.format_exc()) self.msg(traceback.format_exc())
finally: finally:
if csock and csock != startsock: if self.client and self.client != startsock:
csock.close() self.client.close()
if startsock: if startsock:
startsock.close() startsock.close()
......
...@@ -133,7 +133,7 @@ Traffic Legend: ...@@ -133,7 +133,7 @@ Traffic Legend:
# will be run in a separate forked process for each connection. # will be run in a separate forked process for each connection.
# #
def new_client(self, client): def new_client(self):
""" """
Called after a new WebSocket connection has been established. Called after a new WebSocket connection has been established.
""" """
...@@ -156,9 +156,9 @@ Traffic Legend: ...@@ -156,9 +156,9 @@ Traffic Legend:
if self.verbose and not self.daemon: if self.verbose and not self.daemon:
print self.traffic_legend print self.traffic_legend
# Stat proxying # Start proxying
try: try:
self.do_proxy(client, tsock) self.do_proxy(tsock)
except: except:
if tsock: if tsock:
tsock.close() tsock.close()
...@@ -169,14 +169,14 @@ Traffic Legend: ...@@ -169,14 +169,14 @@ Traffic Legend:
self.rec.close() self.rec.close()
raise raise
def do_proxy(self, client, target): def do_proxy(self, target):
""" """
Proxy client WebSocket to normal target socket. Proxy client WebSocket to normal target socket.
""" """
cqueue = [] cqueue = []
cpartial = "" c_pend = 0
tqueue = [] tqueue = []
rlist = [client, target] rlist = [self.client, target]
tstart = int(time.time()*1000) tstart = int(time.time()*1000)
while True: while True:
...@@ -184,7 +184,7 @@ Traffic Legend: ...@@ -184,7 +184,7 @@ Traffic Legend:
tdelta = int(time.time()*1000) - tstart tdelta = int(time.time()*1000) - tstart
if tqueue: wlist.append(target) if tqueue: wlist.append(target)
if cqueue: wlist.append(client) if cqueue or c_pend: wlist.append(self.client)
ins, outs, excepts = select(rlist, wlist, [], 1) ins, outs, excepts = select(rlist, wlist, [], 1)
if excepts: raise Exception("Socket exception") if excepts: raise Exception("Socket exception")
...@@ -199,53 +199,40 @@ Traffic Legend: ...@@ -199,53 +199,40 @@ Traffic Legend:
tqueue.insert(0, dat[sent:]) tqueue.insert(0, dat[sent:])
self.traffic(".>") self.traffic(".>")
if client in outs:
# Send queued target data to the client
dat = cqueue.pop(0)
sent = client.send(dat)
if sent == len(dat):
self.traffic("<")
if self.rec:
self.rec.write("%s,\n" %
repr("{%s{" % tdelta + dat[1:-1]))
else:
cqueue.insert(0, dat[sent:])
self.traffic("<.")
if target in ins: if target in ins:
# Receive target data, encode it and queue for client # Receive target data, encode it and queue for client
buf = target.recv(self.buffer_size) buf = target.recv(self.buffer_size)
if len(buf) == 0: raise self.EClose("Target closed") if len(buf) == 0: raise self.EClose("Target closed")
cqueue.append(self.encode(buf)) cqueue.append(buf)
self.traffic("{") self.traffic("{")
if client in ins:
if self.client in outs:
# Send queued target data to the client
c_pend = self.send_frames(cqueue)
cqueue = []
#if self.rec:
# self.rec.write("%s,\n" %
# repr("{%s{" % tdelta + dat[1:-1]))
if self.client in ins:
# Receive client data, decode it, and queue for target # Receive client data, decode it, and queue for target
buf = client.recv(self.buffer_size) bufs, closed = self.recv_frames()
if len(buf) == 0: raise self.EClose("Client closed") tqueue.extend(bufs)
if buf == '\xff\x00': #if self.rec:
raise self.EClose("Client sent orderly close frame") # for b in bufs:
elif buf[-1] == '\xff': # self.rec.write(
if buf.count('\xff') > 1: # repr("}%s}%s" % (tdelta, b)) + ",\n")
self.traffic(str(buf.count('\xff')))
self.traffic("}") if closed:
if self.rec: # TODO: What about blocking on client socket?
self.rec.write("%s,\n" % self.send_close()
(repr("}%s}" % tdelta + buf[1:-1]))) raise self.EClose(closed)
if cpartial:
# Prepend saved partial and decode frame(s)
tqueue.extend(self.decode(cpartial + buf))
cpartial = ""
else:
# decode frame(s)
tqueue.extend(self.decode(buf))
else:
# Save off partial WebSockets frame
self.traffic(".}")
cpartial = cpartial + buf
if __name__ == '__main__': if __name__ == '__main__':
usage = "\n %prog [options]" usage = "\n %prog [options]"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment