Files
rfid_/venv/lib/python3.11/site-packages/chafon_rfid/transport.py
2025-04-15 12:15:27 +02:00

76 lines
1.7 KiB
Python

#!/usr/bin/env python
import abc
import socket
class BaseTransport(object):
__metaclass__ = abc.ABCMeta
read_bytecount = 0x100
def __init__(self):
raise NotImplementedError
@abc.abstractmethod
def read_bytes(self, length):
raise NotImplementedError
@abc.abstractmethod
def write_bytes(self, byte_array):
raise NotImplementedError
def read(self):
return self.read_bytes(self.read_bytecount)
def read_frame(self):
length_bytes = self.read_bytes(1)
frame_length = length_bytes[0]
data = length_bytes + self.read_bytes(frame_length)
return bytearray(data)
def write(self, byte_array):
self.write_bytes(byte_array)
class TcpTransport(BaseTransport):
buffer_size = 0xFF
def __init__(self, reader_addr, reader_port, timeout=5, auto_connect=False):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(timeout)
self.reader_addr = reader_addr
self.reader_port = reader_port
if auto_connect:
self.connect()
def connect(self):
self.socket.connect((self.reader_addr, self.reader_port))
def read_bytes(self, length):
return self.socket.recv(length)
def write_bytes(self, byte_array):
self.socket.sendall(byte_array)
def close(self):
self.socket.close()
class MockTransport(BaseTransport):
def __init__(self, data):
self.pointer = 0
self.data = bytes(data)
def read_bytes(self, length):
data = self.data[self.pointer:self.pointer+length]
self.pointer += length
return data
def write_bytes(self, byte_array):
pass
def close(self):
pass