复杂网络研究中的SIR传播模型(Python实现)
SIR
import random
'''
程序主要功能
输入:网络图邻接矩阵,需要被设置为感染源的节点序列,感染率,免疫率,迭代次数step
输出:被设置为感染源的节点序列的SIR感染情况---每次的迭代结果(I+R)/n
'''
def update_node_status(graph, node, beta, gamma):
"""
更新节点状态
:param graph: 网络图
:param node: 节点序数
:param beta: 感染率
:param gamma: 免疫率
"""
# 如果当前节点状态为 感染者(I) 有概率gamma变为 免疫者(R)
if graph.nodes[node]['status'] == 'I':
p = random.random()
if p < gamma:
graph.nodes[node]['status'] = 'R'
# 如果当前节点状态为 易感染者(S) 有概率beta变为 感染者(I)
if graph.nodes[node]['status'] == 'S':
# 获取当前节点的邻居节点
# 无向图:G.neighbors(node)
# 有向图:G.predecessors(node),前驱邻居节点,即指向该节点的节点;G.successors(node),后继邻居节点,即该节点指向的节点。
neighbors = list(graph.predecessors(node))
# 对当前节点的邻居节点进行遍历
for neighbor in neighbors:
# 邻居节点中存在 感染者(I),则该节点有概率被感染为 感染者(I)
if graph.nodes[neighbor]['status'] == 'I':
p = random.random()
if p < beta:
graph.nodes[node]['status'] = 'I'
break
def count_node(graph):
"""
计算当前图内各个状态节点的数目
:param graph: 输入图
:return: 各个状态(S、I、R)的节点数目
"""
s_num, i_num, r_num = 0, 0, 0
for node in graph:
if graph.nodes[node]['status'] == 'S':
s_num += 1
elif graph.nodes[node]['status'] == 'I':
i_num += 1
else:
r_num += 1
return s_num, i_num, r_num
def SIR_network(graph, source, beta, gamma, step):
"""
获得感染源的节点序列的SIR感染情况
:param graph: networkx创建的网络
:param source: 需要被设置为感染源的节点Id所构成的序列
:param beta: 感染率
:param gamma: 免疫率
:param step: 迭代次数
"""
n = graph.number_of_nodes() # 网络节点个数
sir_values = [] # 存储每一次迭代后网络中感染节点数I+免疫节点数R的总和
# 初始化节点状态
for node in graph:
graph.nodes[node]['status'] = 'S' # 将所有节点的状态设置为 易感者(S)
# 设置初始感染源
for node in source:
graph.nodes[node]['status'] = 'I' # 将感染源序列中的节点设置为感染源,状态设置为 感染者(I)
# 记录初始状态
sir_values.append(len(source) / n)
# 开始迭代感染
for s in range(step):
# 针对对每个节点进行状态更新以完成本次迭代
for node in graph:
update_node_status(graph, node, beta, gamma) # 针对node号节点进行SIR过程
s, i, r = count_node(graph) # 得到本次迭代结束后各个状态(S、I、R)的节点数目
sir = (i + r) / n # 该节点的sir值为迭代结束后 感染节点数i+免疫节点数r
sir_values.append(sir) # 将本次迭代的sir值加入数组
return sir_values
应用
-
更多应用请查看作者的其它博文 !
-
计算n次独立实验中各方法SIR传播范围的平均值
if __name__ == '__main__': # 读取网络数据 # adj = np.loadtxt('adj.txt', dtype=np.int) # 网络的邻接矩阵 # edges = np.loadtxt('edges.txt', dtype=str) # 网络的邻接表/边列表 adj = np.loadtxt('adj.txt', dtype=np.int) # 网络的邻接矩阵 # 生成网络图(此处注意根据网络类型调整SIR代码中邻居节点的获取方式) # 无向图:graph = nx.from_numpy_matrix(adj) # 有向图:graph = nx.DiGraph(adj) # 根据连边生成网络: # 1. graph = nx.Graph() # 生成网络图对象 # 2. graph.add_edges_from(edges) # 添加连边 graph = nx.DiGraph(adj) # 创建有向图 metheds = pd.read_csv('多方法排序.csv') # 多种方法(a,b,c,d)的排序结果 # 各方法的对应列 method_a = metheds.iloc[:, 1] method_b = metheds.iloc[:, 2] method_c = metheds.iloc[:, 3] method_d = metheds.iloc[:, 4] ''' 初始化存储每次SIR传播范围的csv文件 本代码中分别对Top-5,10,15的节点做实验 ''' for top_k in range(5, 20, 5): file = open('result/Top-' + str(top_k) + ' SIR.csv', "w", newline="") content = csv.writer(file) content.writerow(['a','b','c','d']) #写入文件列标题(各方法的名称) file.close() print('Top-' + str(top_k) + ' SIR.csv 已初始化!') ''' 进行n次实验取平均值 ''' n = 1000 for i in range(n): print('第 ' + str(i + 1) + ' 次实验') for top_k in range(5, 20, 5): print('Top-' + str(top_k) + '...') # SIR参数设置 beta = 0.15 # 感染率 gamma = 0.5 # 免疫率 step = 20 # 迭代次数 # top-k序列 source_a = method_a.argsort()[::-1][0:top_k] source_b = method_b.argsort()[::-1][0:top_k] source_c = method_c.argsort()[::-1][0:top_k] source_d = method_d.argsort()[::-1][0:top_k] # Top-k节点的感染情况 sir_a = SIR_network(graph, source_a , beta, gamma, step) sir_b = SIR_network(graph, source_b , beta, gamma, step) sir_c = SIR_network(graph, source_c , beta, gamma, step) sir_d = SIR_network(graph, source_d , beta, gamma, step) # 存储每次实验的最终感染情况 file = open('result/Top-' + str(top_k) + ' SIR.csv') reader = csv.reader(file) original = list(reader) file1 = open('result/Top-' + str(top_k) + ' SIR.csv', "w", newline="") content = csv.writer(file1) # 存储文件中的原始数据 for row in original: content.writerow(row) # 存储新数据 content.writerow([sir_a[step - 1], sir_b[step - 1], sir_c[step - 1], sir_d[step - 1]]) file.close() file1.close() ''' 输出n次实验的平均值 ''' for top_k in range(5, 20, 5): sir_results = pd.read_csv('result/Top-' + str(top_k) + ' SIR.csv') # 多种方法的排序结果 print('Top-' + str(top_k) + ' SIR(Average)') print(sir_results.describe(include='all').loc['mean']) # 输出数据特征 ```