Skip to content

Instantly share code, notes, and snippets.

@Vyryn
Last active December 11, 2020 08:14
Show Gist options
  • Select an option

  • Save Vyryn/44c40ff7b3c925d334f920dba00a5bc8 to your computer and use it in GitHub Desktop.

Select an option

Save Vyryn/44c40ff7b3c925d334f920dba00a5bc8 to your computer and use it in GitHub Desktop.
Meros cryptocurrency RPC socket interpretation in python
# By Vyryn, licensed under GNU GPL.
# Some basic Meros testnet RPC communication to make it a little more convinient to monitor network status and
# test transaction success timing. A few basic async functions and what amounts to terrible examples of their use.
import json
import asyncio
URL = "10.0.0.109"
PORT = 5133
loop = asyncio.get_event_loop()
async def read_socket_response(reader: asyncio.StreamReader) -> str:
resp = ''
brak = 0
while True:
resp += str(await reader.read(1), "UTF-8")
if resp[-1] == resp[0]:
brak += 1
elif resp[0] == '{' and resp[-1] == '}':
brak -= 1
elif resp[0] == '[' and resp[-1] == ']':
brak -= 1
if not brak:
break
return resp
async def rpc(method, params=[]):
payload = str(json.dumps({"jsonrpc": "2.0", "id": 0, "params": params, "method": method}))
print('Connecting...')
reader, writer = await asyncio.open_connection(host=URL, port=PORT, loop=loop)
print('Connected.')
writer.write(bytes(payload, "UTF-8"))
print('Sent.')
print(await read_socket_response(reader))
loop.run_until_complete(rpc('merit_getUnlockedMerit'))
loop.run_until_complete(rpc('transactions_getBalance',
['mr1qz7959tux0jepjj4ksqc5lzny2mnwyfvsufdvwkgkvzrrcr9u2r050csy4z']))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment