加入星計(jì)劃,您可以享受以下權(quán)益:

  • 創(chuàng)作內(nèi)容快速變現(xiàn)
  • 行業(yè)影響力擴(kuò)散
  • 作品版權(quán)保護(hù)
  • 300W+ 專(zhuān)業(yè)用戶(hù)
  • 1.5W+ 優(yōu)質(zhì)創(chuàng)作者
  • 5000+ 長(zhǎng)期合作伙伴
立即加入
  • 正文
    • 三、定制服務(wù)器
    • ?四、程序
  • 相關(guān)推薦
  • 電子產(chǎn)業(yè)圖譜
申請(qǐng)入駐 產(chǎn)業(yè)圖譜

python實(shí)現(xiàn)FINS協(xié)議的TCP服務(wù)端(篇二)

12/07 08:05
788
閱讀需 14 分鐘
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點(diǎn)資訊討論

python實(shí)現(xiàn)FINS協(xié)議的TCP服務(wù)端是一件稍微麻煩點(diǎn)的事情。它不像modbusTCP那樣,可以使用現(xiàn)成的pymodbus模塊去實(shí)現(xiàn)。但是,我們可以根據(jù)協(xié)議幀進(jìn)行組包,自己去實(shí)現(xiàn)幀的格式,而這一切可以基于socket模塊。本文為第二篇。

三、定制服務(wù)器

1、對(duì)比讀寫(xiě)保持寄存器的請(qǐng)求

通過(guò)對(duì)比請(qǐng)求,來(lái)判斷讀寫(xiě)請(qǐng)求,以對(duì)應(yīng)不同的響應(yīng)。

# 以寫(xiě)為例,賦予讀的對(duì)比

Header:46 49 4E 53 固定值

Length:00 00 02 30 包的長(zhǎng)度  --  根據(jù)實(shí)際情況改變

Command:00 00 00 02 固定值

Error Code:00 00 00 00 固定值

ICF:固定值80

RSV:固定值00

GCT:固定值02

DNA:目標(biāo)網(wǎng)絡(luò)號(hào)00

DA1:目標(biāo)節(jié)點(diǎn)號(hào)01

DA2:目標(biāo)單元號(hào)00

SNA:源網(wǎng)絡(luò)號(hào)00

SA1:源節(jié)點(diǎn)號(hào)01

SA2:目標(biāo)單元號(hào)00

SID:源網(wǎng)絡(luò)號(hào) 3E(也可能是其他) -- 根據(jù)獲取數(shù)量從00 開(kāi)始變FF后再為00,可忽略

MRC:01

SRC:02  -- 讀的時(shí)候?yàn)?1,寫(xiě)的時(shí)候?yàn)?2

Area:82  -- 保持寄存器地址對(duì)應(yīng)82

Address:03 EC 00  -- 實(shí)際地址+位地址

length:01 0B  -- 寫(xiě)的長(zhǎng)度可變!讀的長(zhǎng)度也可變?

value:...

2、編寫(xiě)程序注意事項(xiàng)

由于一個(gè)請(qǐng)求是以2個(gè)請(qǐng)求或多個(gè)請(qǐng)求進(jìn)行的,因此在編寫(xiě)服務(wù)器的時(shí)候,確實(shí)加了一些小困難,簡(jiǎn)單解釋如下:

(1)先發(fā)請(qǐng)求頭

(2)再發(fā)指令

但對(duì)于響應(yīng)來(lái)說(shuō),卻是一個(gè)完整的包:

(1)請(qǐng)求頭與指令一起發(fā)

?四、程序

1、代碼

import socket

def recognition_frame(req_bytes_frame, Trigger):
    get_frame = req_bytes_frame.hex().upper()
    print("設(shè)備請(qǐng)求:", get_frame)
    # 判斷是否為握手命令
    if get_frame == "46494E530000000C000000000000000000000000":
        response = "46494E530000001000000000000000000000000100000001"
        return bytes().fromhex(response)
    # 判斷是否為其他FINS命令的請(qǐng)求頭,只要是請(qǐng)求頭都只反應(yīng)空
    elif "46494E53" in get_frame:  # 收到FINS的請(qǐng)求頭 == "46494E530000001A0000000200000000" 或 其他請(qǐng)求頭
        print("只收到請(qǐng)求頭,響應(yīng)將為空!")
        response = ""
        return bytes().fromhex(response)
    else:
        SRC_value = get_frame[22:24]  # 判斷讀寫(xiě),01為讀,02為寫(xiě)
        Area_value = get_frame[24:26]  # 判斷寄存器區(qū)域,82為保持寄存器
        # print(SRC_value)
        # print(Area_value)
        if SRC_value == "01":
            if Area_value == "82":
                response_1 = "46494E5300000018000000000000000000000000000000000000010100000001"  # Trigger位為T(mén)rue
                response_0 = "46494E5300000018000000000000000000000000000000000000010100000000"  # Trigger位為False
                if Trigger == True:
                    return bytes().fromhex(response_1)
                else:
                    return bytes().fromhex(response_0)
            else:
                raise ValueError("Area_value is error!")
        elif SRC_value == "02":
            if Area_value == "82":
                print("***************************************")
                # 寫(xiě)保持寄存器的響應(yīng)
                print("掃碼器寫(xiě)入的結(jié)果數(shù)據(jù):", bytes().fromhex(get_frame))
                response = "46494E530000001600000000000000000000000000000000000001020000"
                return bytes().fromhex(response)
            else:
                raise ValueError("Area_value is error!")
        else:
            raise ValueError("SRC_value is error!")

if __name__ == "__main__":
    DM_start = 1000

    # 創(chuàng)建FINS服務(wù)端
    # 創(chuàng)建一個(gè)TCP/IP套接字
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 綁定套接字到特定地址和端口
    server_address = ('192.168.1.188', 9600)  # 服務(wù)器地址和端口
    server_socket.bind(server_address)
    # 監(jiān)聽(tīng)連接
    server_socket.listen(1)
    print('等待客戶(hù)端連接...')
    connection, client_address = server_socket.accept()
    print('客戶(hù)端已連接:', client_address)

    try:
        num = 0  # 觸發(fā)標(biāo)志
        Trigger_rec = 0  # Trigger置為T(mén)rue時(shí),對(duì)應(yīng)變?yōu)?,表示觸發(fā)一次
        response = "" # 響應(yīng)
        while True:
            # 接收客戶(hù)端請(qǐng)求
            request = connection.recv(1024)
            if request:
                # 如果收到的不是請(qǐng)求頭
                if "8000020001000001" in request.hex():
                    # print(request.hex()[22:24])
                    # 實(shí)現(xiàn)掃碼觸發(fā)
                    if request.hex()[22:24] == "01":  # 判斷讀寫(xiě),01為讀觸發(fā)指令,02為寫(xiě)觸發(fā)結(jié)果
                        if Trigger_rec != 2:
                            Trigger_rec += 1
                        if Trigger_rec == 1:
                            response = recognition_frame(request, Trigger=False)  # 先清空觸發(fā)信號(hào)
                            connection.sendall(response)
                        elif Trigger_rec == 2:  # 復(fù)位Trigger信號(hào)
                            response = recognition_frame(request, Trigger=True)  # 再置位觸發(fā)信號(hào)
                            connection.sendall(response)
                    # 實(shí)現(xiàn)結(jié)果接收
                    elif request.hex()[22:24] == "02":
                        print(request.hex())
                        # print("---------------", int(request.hex()[26:30], 16))
                        if int(request.hex()[26:30], 16) == DM_start + 4:
                            if any(c != '0' for c in request.hex()[36:]):  # 不全為0
                                print("掃碼結(jié)果:", request.hex()[36:])
                                num += 1
                                Trigger_rec = 0
                            else:
                                response = recognition_frame(request, Trigger=True)
                                connection.sendall(response)
                                print("還沒(méi)有收到結(jié)果,繼續(xù)等待掃碼結(jié)果!")
                        else:
                            response = recognition_frame(request, Trigger=True)
                            connection.sendall(response)
                # 處理其他請(qǐng)求
                else:
                    response = recognition_frame(request, Trigger=True)
                    connection.sendall(response)
                print("服務(wù)響應(yīng):", response.hex())
                if num == 1:
                    assert bytes().fromhex(request.hex()[36:]).decode() == "NG", "實(shí)際掃碼結(jié)果為:{},不符合預(yù)期".format(bytes().fromhex(request.hex()[36:]).decode())
                    break
                request = False
    finally:
        # 清理連接
        connection.close()

2、解釋

這段代碼是一個(gè)使用FINS協(xié)議的服務(wù)器端程序。它監(jiān)聽(tīng)指定地址和端口,接收客戶(hù)端請(qǐng)求,根據(jù)請(qǐng)求內(nèi)容作出相應(yīng)的響應(yīng)。以下是對(duì)主要部分的解釋?zhuān)?/p>

(1)recognition_frame 函數(shù):

接收一個(gè) req_bytes_frame 參數(shù),這是客戶(hù)端請(qǐng)求的字節(jié)表示。

get_frame 變量將字節(jié)表示轉(zhuǎn)換為大寫(xiě)的十六進(jìn)制字符串。

通過(guò)一系列條件判斷,判斷請(qǐng)求類(lèi)型并返回相應(yīng)的響應(yīng)。
(2)if __name__ == "__main__": 部分:

初始化一些變量,如 DM_start、num、Trigger_rec 和 response。

創(chuàng)建一個(gè) TCP 服務(wù)器套接字,綁定地址和端口,然后監(jiān)聽(tīng)連接。

在一個(gè)無(wú)限循環(huán)中,接收客戶(hù)端請(qǐng)求,判斷請(qǐng)求類(lèi)型并發(fā)送相應(yīng)的響應(yīng)。

掃碼觸發(fā)和結(jié)果接收部分:

如果接收到的請(qǐng)求的十六進(jìn)制表示包含特定的模式("8000020001000001"),則執(zhí)行掃碼觸發(fā)或結(jié)果接收的邏輯。

①觸發(fā)時(shí),通過(guò) recognition_frame 函數(shù)發(fā)送相應(yīng)的響應(yīng)。
②結(jié)果接收時(shí),判斷是否是指定寄存器的寫(xiě)入,如果寫(xiě)入的內(nèi)容不全為零,則認(rèn)為收到了掃碼結(jié)果。
recognition_frame 函數(shù)中的異常處理:

如果在解析請(qǐng)求時(shí)發(fā)現(xiàn)不符合預(yù)期的情況,拋出 ValueError 異常。

finally 塊:

在程序結(jié)束時(shí)關(guān)閉連接。

總體來(lái)說(shuō),這是一個(gè)基于 FINS 協(xié)議的服務(wù)器程序,主要用于處理掃碼觸發(fā)和結(jié)果接收,并通過(guò) FINS 協(xié)議進(jìn)行通信。

相關(guān)推薦

電子產(chǎn)業(yè)圖譜