Skip to content

Instantly share code, notes, and snippets.

@Vyryn
Created February 25, 2021 06:18
Show Gist options
  • Select an option

  • Save Vyryn/a6737a9e3439da2440978b4590971784 to your computer and use it in GitHub Desktop.

Select an option

Save Vyryn/a6737a9e3439da2440978b4590971784 to your computer and use it in GitHub Desktop.
# By Vyryn, licensed under GNU GPL.
# Some basic Meros testnet RPC communication to make it a little more convinient to monitor network status and
# test transaction success timing. A few basic async functions and what amounts to terrible examples of their use.
import json
import asyncio
import time
URL = "10.0.0.109"
PORT = 5133
addr_1 = 'mr1qz7959tux0jepjj4ksqc5lzny2mnwyfvsufdvwkgkvzrrcr9u2r050csy4z'
addr_2 = 'mr1qpxdjdgkgzju9nur50wcu8qtxhcnd5m9mmj0x3m6nfdmv09uu96rcwmp9m5'
addr_3 = 'mr1qpe7y74w59syuxzkrjpzfth8v4xdkxd5lzqz72z6jagp3gesncjh6f66uxx'
loop = asyncio.get_event_loop()
initial_time = time.time()
async def read_socket_response(reader: asyncio.StreamReader) -> str:
resp = ''
brak = 0
while True:
resp += str(await reader.read(1), "UTF-8")
if resp[-1] == resp[0]:
brak += 1
elif resp[0] == '{' and resp[-1] == '}':
brak -= 1
elif resp[0] == '[' and resp[-1] == ']':
brak -= 1
if not brak:
break
return resp
async def rpc(method, params=[], disp=False) -> str:
if not isinstance(params, list):
params = [params]
payload = str(json.dumps({"jsonrpc": "2.0", "id": 0, "params": params, "method": method}))
reader, writer = await asyncio.open_connection(host=URL, port=PORT, loop=loop)
writer.write(bytes(payload, "UTF-8"))
response = json.loads((await read_socket_response(reader)))
if disp:
if params and 'result' in response:
print(f"{method} {params} >>> {response['result']}")
elif 'result' in response:
print(f"{method} >>> {response['result']}")
else:
print(f'{method} {params} >>> {response}')
writer.close()
await writer.wait_closed()
if 'result' in response:
return response['result']
else:
return response
async def get_addr(account: int = 0) -> str:
result = await rpc('personal_getAddress', [account, False])
return result
async def send(to: str, amount=1) -> str:
# result = await rpc('personal_send', [to, str(amount)])
result = await rpc('personal_data', 'Test data transaction')
return result
async def send_n_transactions(n=1, tps=1) -> [str]:
per = 1 / tps
hashes = []
for i in range(n):
hashes.append(await send(addr_2))
if i not in [0, n - 1]: # Don't wait on first or last
await asyncio.sleep(delay=per)
return hashes
async def show_transactions(hash_list: [str]) -> None:
for hash_ in hash_list:
trx_info = await rpc('transactions_getTransaction', hash_)
trx_status = await rpc('consensus_getStatus', hash_)
print(trx_info['outputs'])
print(trx_status)
async def wait_for_all_confirmed(hash_list: [str]):
start_time = initial_time
comp_hashes = []
to_complete = len(hash_list)
complete = 0
while len(hash_list) > 0:
for hash_ in hash_list:
trx_status = await rpc('consensus_getStatus', hash_)
if 'verified' in trx_status:
if trx_status['verified']:
hash_list.remove(hash_)
comp_hashes.append(hash_)
complete += 1
else:
print(f'Malformed trx_status: {trx_status}')
print(f'{complete}/{to_complete} transactions verified after {round((time.time() - start_time) * 10) / 10} '
f'seconds.')
if time.time() - start_time > 30:
print(f'Pending hashes: {hash_list}')
print(f'Completed hashes: {comp_hashes}')
await asyncio.sleep(1)
pass
end_time = time.time()
return end_time - start_time
# print('Peers: ' + str(loop.run_until_complete(rpc('network_getPeers'))))
merit_dist = True
if merit_dist:
unlocked_merit = loop.run_until_complete(rpc('merit_getUnlockedMerit'))
print(f'Unlocked Merit:\t{unlocked_merit}')
peers = {}
for i in range(30):
peers[i] = loop.run_until_complete(rpc('merit_getMerit', i))
for key, value in peers.items():
merit = value['merit']
if merit > 0:
print(f'{key}\t{merit}')
print(f"Peers: {[str(key) + ':' + str(value['merit']) for key, value in peers.items() if value['merit'] > 0]}")
print('My balance: ' + loop.run_until_complete(rpc('transactions_getBalance', addr_1)))
# print('My address: ' + loop.run_until_complete(get_addr(0)))
# loop.run_until_complete(get_addr(1))
# loop.run_until_complete(get_addr(2))
spam = False
if spam:
target_tps = 1000
n = 2
broadcast_start_time = time.time()
hashes = loop.run_until_complete(send_n_transactions(n=n, tps=target_tps))
broadcast_end_time = time.time()
achieved_tps = n / (broadcast_end_time - broadcast_start_time)
print(f'Broadcast {n} transactions at {round(achieved_tps * 100) / 100} tps.')
delta_t = loop.run_until_complete(wait_for_all_confirmed(hashes))
print(f'Time taken = {round(delta_t * 100) / 100} seconds.')
# loop.run_until_complete(show_transactions(hashes))
# print('Waiting 1 second...')
# loop.run_until_complete(asyncio.sleep(1))
# loop.run_until_complete(show_transactions(hashes))
# loop.run_until_complete((asyncio.sleep(4)))
# loop.run_until_complete(show_transactions(hashes))
print(f"Blockchain height: {loop.run_until_complete(rpc('merit_getHeight'))}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment