实例介绍
【实例简介】
1、本opc驱动采用订阅模式来读写OPC
2、通过订阅MQTT来接收外部命令以及将读到的数据写出
【核心代码】
using Newtonsoft.Json;
using OPCAutomation;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace OpcDriverTest
{
class OpcService
{
MqttClient mqttClient;
OPCServer opcServer;
OPCGroups opcGroups;
List<OPCItem> opcItemList;
OPCGroup opcGroup;
Dictionary<string,int> opcTagDictionary=new Dictionary<string, int>();
string[] opcTags;
public OpcService()
{
startMqtt();
startOpc();
}
private void startMqtt()
{
//获取MQTT配置
string url = ConfigurationSettings.AppSettings["mqtt.url"].ToString();
string clientId = ConfigurationSettings.AppSettings["mqtt.clientId"].ToString();
int port = Convert.ToInt32(ConfigurationSettings.AppSettings["mqtt.port"]);
string receiveTopic = ConfigurationSettings.AppSettings["mqtt.topic.opc.receive"].ToString();
mqttClient = new MqttClient(url);
mqttClient.MqttMsgPublishReceived = client_MqttMsgPublishReceived;
//连接MQTT
mqttClient.Connect(clientId);
//订阅主题,用来接收上层发来的命令
mqttClient.Subscribe(new string[] { receiveTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
private void startOpc()
{
string opcServerName = ConfigurationSettings.AppSettings["opc.serverName"].ToString();
opcServer = new OPCServer();
connectOPC("", opcServerName);
//配置组
opcGroups = opcServer.OPCGroups;
opcGroups.DefaultGroupIsActive = true;
opcGroups.DefaultGroupDeadband = 0;
opcGroups.DefaultGroupUpdateRate = 200;
//获取配置的标记
string tags = ConfigurationSettings.AppSettings["opc.tags"].ToString();
string group = ConfigurationSettings.AppSettings["opc.group".ToString()];
opcTags= tags.Split(',');
//转成Dictionary,方便查询用
for (int i = 0; i < opcTags.Length; i )
{
opcTagDictionary.Add(opcTags[i], i);
}
List<string> itemList = new List<string>();
addListenGroup(group, opcTags);
}
private bool connectOPC(string remoteServerName, string remoteServerIP)
{
try
{
opcServer.Connect(remoteServerIP, remoteServerName);//连接本地服务器:服务器名 主机名或IP
if (opcServer.ServerState == (int)OPCServerState.OPCRunning)
{
Console.WriteLine("已连接到:{0}", opcServer.ServerName);
}
else
{
//这里你可以根据返回的状态来自定义显示信息,请查看自动化接口API文档
Console.WriteLine("状态:{0}", opcServer.ServerState.ToString());
}
opcServer.ServerShutDown = ServerShutDown;//服务器断开事件
}
catch (Exception err)
{
Console.WriteLine("连接远程服务器出现错误:{0}" err.Message);
return false;
}
return true;
}
private void ServerShutDown(string Reason)
{
Console.WriteLine("OPC服务器已经先行断开!");
}
//注册监听opc item
public void addListenGroup(string group, string[] itemList)
{
opcGroup = opcGroups.Add(group);
opcGroup.UpdateRate = 100;
opcGroup.IsSubscribed = true;
opcGroup.DataChange = new DIOPCGroupEvent_DataChangeEventHandler(OpcGroup_DataChange);
opcGroup.AsyncWriteComplete = OpcGroup_AsyncWriteComplete;
OPCItems opcItems = opcGroup.OPCItems;
opcItemList = new List<OPCItem>();
for (int i = 1; i <= itemList.Length; i )
{
string itemName = itemList[i - 1];
int clientHande = i;
OPCItem item= opcItems.AddItem(itemName, clientHande);
opcItemList.Add(item);
}
}
//opc 数据变化事件
private void OpcGroup_DataChange(int TransactionID, int NumItems, ref Array ClientHandles, ref Array ItemValues, ref Array Qualities, ref Array TimeStamps)
{
Dictionary<string, object> param = new Dictionary<string, object>();
for (int i = 1; i <= NumItems; i )
{
Console.WriteLine( "Tran:" TransactionID.ToString() " CH:" ClientHandles.GetValue(i).ToString() " value:" ItemValues.GetValue(i) " timestamps:" TimeStamps.GetValue(i).ToString());
int index = (int)ClientHandles.GetValue(i)-1; //得到数据的index,通过Index得知是哪个数据
param.Add(opcTags[index], ItemValues.GetValue(i));
//opcAsyncWrite(opcItemList[1], ItemValues.GetValue(i).ToString()); //异步写
//opcItemList[1].Write(ItemValues.GetValue(i).ToString()); //同步写
}
//写入mqtt
string publishTopic = ConfigurationSettings.AppSettings["mqtt.topic.opc.publish"].ToString();
string json = JsonConvert.SerializeObject(param);
mqttClient.Publish(publishTopic, Encoding.UTF8.GetBytes(json));
}
//单个写
private void opcAsyncWrite(OPCItem item,string value)
{
int[] temp = new int[2] { 0, item.ServerHandle };
Array serverHandles = (Array)temp;
object[] valueTemp = new object[2] { "", value };
Array values = (Array)valueTemp;
Array Errors;
int cancelID;
opcGroup.AsyncWrite(1, ref serverHandles, ref values, out Errors, 2009, out cancelID);
}
//单个写(无法批量写)
private void opcAsyncWrite(Array serverHandles, Array values)
{
Array Errors;
int cancelID;
opcGroup.AsyncWrite(1, ref serverHandles, ref values, out Errors, 2009, out cancelID);
}
//opc异步写回调消息
private void OpcGroup_AsyncWriteComplete(int TransactionID, int NumItems, ref Array ClientHandles, ref Array Errors)
{
for (int i = 1; i <= NumItems; i )
{
//lblState.Text = "Tran:" TransactionID.ToString() " CH:" ClientHandles.GetValue(i).ToString() " Error:" Errors.GetValue(i).ToString();
Console.WriteLine("Tran:" TransactionID.ToString() " CH:" ClientHandles.GetValue(i).ToString() " Error:" Errors.GetValue(i).ToString());
}
}
//收到MQTT消息
private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
try
{
string message= Encoding.UTF8.GetString(e.Message);
Dictionary<string, object> param = JsonConvert.DeserializeObject<Dictionary<string, object>>(message);
foreach (var item in param)
{
int[] sererHandles = new int[param.Count 1];
object[] values = new object[param.Count 1];
//获取opc item序号
try
{
int index = opcTagDictionary[item.Key];
//获取opc item
sererHandles.SetValue(opcItemList[index].ServerHandle, 1);//opc是从1号开始
values.SetValue(item.Value, 1);
opcAsyncWrite(sererHandles, values);
}
catch (Exception ex )
{
Console.WriteLine("异常:未在点表中找到" item.Key ",已被忽略");
}
}
}
catch (Exception e1) {
Console.WriteLine(e1.Message);
}
}
}
}
小贴士
感谢您为本站写下的评论,您的评论对其它用户来说具有重要的参考价值,所以请认真填写。
- 类似“顶”、“沙发”之类没有营养的文字,对勤劳贡献的楼主来说是令人沮丧的反馈信息。
- 相信您也不想看到一排文字/表情墙,所以请不要反馈意义不大的重复字符,也请尽量不要纯表情的回复。
- 提问之前请再仔细看一遍楼主的说明,或许是您遗漏了。
- 请勿到处挖坑绊人、招贴广告。既占空间让人厌烦,又没人会搭理,于人于己都无利。
关于好例子网
本站旨在为广大IT学习爱好者提供一个非营利性互相学习交流分享平台。本站所有资源都可以被免费获取学习研究。本站资源来自网友分享,对搜索内容的合法性不具有预见性、识别性、控制性,仅供学习研究,请务必在下载后24小时内给予删除,不得用于其他任何用途,否则后果自负。基于互联网的特殊性,平台无法对用户传输的作品、信息、内容的权属或合法性、安全性、合规性、真实性、科学性、完整权、有效性等进行实质审查;无论平台是否已进行审查,用户均应自行承担因其传输的作品、信息、内容而可能或已经产生的侵权或权属纠纷等法律责任。本站所有资源不代表本站的观点或立场,基于网友分享,根据中国法律《信息网络传播权保护条例》第二十二与二十三条之规定,若资源存在侵权或相关问题请联系本站客服人员,点此联系我们。关于更多版权及免责申明参见 版权及免责申明


网友评论
我要评论